net/webrtc: implement AWS KVS signaller

And expose a wrapper webrtcsink variant, aws-kvs-webrtcsink.

This adds support in webrtcsink for processing a consumer offer, instead
of producing one.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1114>
This commit is contained in:
Mathieu Duponchelle 2023-03-02 00:01:43 +01:00 committed by Sebastian Dröge
parent 82f1789589
commit 584392049c
10 changed files with 1743 additions and 219 deletions

View file

@ -5904,6 +5904,37 @@
"rswebrtc": { "rswebrtc": {
"description": "GStreamer plugin for high level WebRTC elements and a simple signaling server", "description": "GStreamer plugin for high level WebRTC elements and a simple signaling server",
"elements": { "elements": {
"awskvswebrtcsink": {
"author": "Mathieu Duponchelle <mathieu@centricular.com>",
"description": "WebRTC sink",
"hierarchy": [
"GstAwsKvsWebRTCSink",
"GstWebRTCSink",
"GstBin",
"GstElement",
"GstObject",
"GInitiallyUnowned",
"GObject"
],
"interfaces": [
"GstChildProxy",
"GstNavigation"
],
"klass": "Sink/Network/WebRTC",
"pad-templates": {
"audio_%%u": {
"caps": "audio/x-raw:\n",
"direction": "sink",
"presence": "request"
},
"video_%%u": {
"caps": "video/x-raw:\n\nvideo/x-raw(memory:CUDAMemory):\n\nvideo/x-raw(memory:GLMemory):\n\nvideo/x-raw(memory:NVMM):\n",
"direction": "sink",
"presence": "request"
}
},
"rank": "none"
},
"webrtcsink": { "webrtcsink": {
"author": "Mathieu Duponchelle <mathieu@centricular.com>", "author": "Mathieu Duponchelle <mathieu@centricular.com>",
"description": "WebRTC sink", "description": "WebRTC sink",

View file

@ -33,6 +33,18 @@ gst_plugin_webrtc_protocol = { path="protocol", package = "gst-plugin-webrtc-sig
human_bytes = "0.4" human_bytes = "0.4"
url = "2" url = "2"
aws-config = "0.52.0"
aws-types = "0.52.0"
aws-sig-auth = "0.52.0"
aws-smithy-http = { version = "0.52.0", features = [ "rt-tokio" ] }
aws-smithy-types = "0.52.0"
aws-sdk-kinesisvideo = "0.22.0"
aws-sdk-kinesisvideosignaling = "0.22.0"
http = "0.2.7"
chrono = "0.4"
data-encoding = "2.3.3"
url-escape = "0.1.1"
[dev-dependencies] [dev-dependencies]
tracing = { version = "0.1", features = ["log"] } tracing = { version = "0.1", features = ["log"] }
tracing-subscriber = { version = "0.3", features = ["registry", "env-filter"] } tracing-subscriber = { version = "0.3", features = ["registry", "env-filter"] }

View file

@ -197,5 +197,19 @@ Parts of the JavaScript code in the www/ example are licensed under the [Apache
the rest is licensed under the [Mozilla Public License Version 2.0] unless advertised in the the rest is licensed under the [Mozilla Public License Version 2.0] unless advertised in the
header. header.
## Using the AWS KVS signaller
* Setup AWS Kinesis Video Streams
* Create a channel from the AWS console (<https://us-east-1.console.aws.amazon.com/kinesisvideo/home?region=us-east-1#/signalingChannels/create>)
* Start a producer:
```
AWS_ACCESS_KEY_ID="XXX" AWS_SECRET_ACCESS_KEY="XXX" gst-launch-1.0 videotestsrc pattern=ball ! video/x-raw, width=1280, height=720 ! videoconvert ! textoverlay text="Hello from GStreamer!" ! videoconvert ! awskvswebrtcsink name=ws signaller::channel-name="XXX"
```
* Connect a viewer @ <https://awslabs.github.io/amazon-kinesis-video-streams-webrtc-sdk-js/examples/index.html>
[Mozilla Public License Version 2.0]: http://opensource.org/licenses/MPL-2.0 [Mozilla Public License Version 2.0]: http://opensource.org/licenses/MPL-2.0
[Apache License, Version 2.0]: https://www.apache.org/licenses/LICENSE-2.1 [Apache License, Version 2.0]: https://www.apache.org/licenses/LICENSE-2.1

View file

@ -0,0 +1,734 @@
// SPDX-License-Identifier: MPL-2.0
use super::protocol as p;
use crate::webrtcsink::WebRTCSink;
use crate::RUNTIME;
use anyhow::{anyhow, Error};
use async_tungstenite::tungstenite::Message as WsMessage;
use futures::channel::mpsc;
use futures::prelude::*;
use gst::glib;
use gst::glib::prelude::*;
use gst::subclass::prelude::*;
use once_cell::sync::Lazy;
use std::path::PathBuf;
use std::sync::Mutex;
use tokio::task;
use aws_config::default_provider::credentials::DefaultCredentialsChain;
use aws_sdk_kinesisvideo::{
model::{ChannelProtocol, ChannelRole, SingleMasterChannelEndpointConfiguration},
Client, Endpoint,
};
use aws_sdk_kinesisvideosignaling::Client as SignalingClient;
use aws_sig_auth::signer::{self, HttpSignatureType, OperationSigningConfig, RequestConfig};
use aws_smithy_http::body::SdkBody;
use aws_types::credentials::ProvideCredentials;
use aws_types::region::{Region, SigningRegion};
use aws_types::{Credentials, SigningService};
use chrono::prelude::*;
use data_encoding::BASE64;
use http::Uri;
use std::time::{Duration, SystemTime};
const DEFAULT_AWS_REGION: &str = "us-east-1";
const DEFAULT_PING_TIMEOUT: i32 = 30;
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
gst::DebugCategory::new(
"webrtc-aws-kvs-signaller",
gst::DebugColorFlags::empty(),
Some("WebRTC AWS KVS signaller"),
)
});
#[derive(Default)]
struct State {
/// Sender for the websocket messages
websocket_sender: Option<mpsc::Sender<p::OutgoingMessage>>,
send_task_handle: Option<task::JoinHandle<Result<(), Error>>>,
receive_task_handle: Option<task::JoinHandle<()>>,
}
#[derive(Clone)]
struct Settings {
address: Option<String>,
cafile: Option<PathBuf>,
access_key: Option<String>,
secret_access_key: Option<String>,
session_token: Option<String>,
channel_name: Option<String>,
ping_timeout: i32,
}
impl Default for Settings {
fn default() -> Self {
Self {
address: Some("ws://127.0.0.1:8443".to_string()),
cafile: None,
access_key: None,
secret_access_key: None,
session_token: None,
channel_name: None,
ping_timeout: DEFAULT_PING_TIMEOUT,
}
}
}
#[derive(Default)]
pub struct Signaller {
state: Mutex<State>,
settings: Mutex<Settings>,
}
impl Signaller {
fn handle_message(element: &WebRTCSink, msg: String) {
if let Ok(msg) = serde_json::from_str::<p::IncomingMessage>(&msg) {
match BASE64.decode(&msg.message_payload.into_bytes()) {
Ok(payload) => {
let payload = String::from_utf8_lossy(&payload);
match msg.message_type.as_str() {
"SDP_OFFER" => {
if let Ok(sdp_msg) = serde_json::from_str::<p::SdpOffer>(&payload) {
gst::log!(
CAT,
"Consumer {} got SDP offer: {}",
msg.sender_client_id,
sdp_msg.sdp
);
if let Err(err) = element.start_session(
&msg.sender_client_id,
&msg.sender_client_id,
Some(&gst_webrtc::WebRTCSessionDescription::new(
gst_webrtc::WebRTCSDPType::Offer,
gst_sdp::SDPMessage::parse_buffer(sdp_msg.sdp.as_bytes())
.unwrap(),
)),
) {
gst::warning!(CAT, obj: element, "{err}");
}
} else {
gst::warning!(
CAT,
obj: element,
"Failed to parse SDP_OFFER: {payload}"
);
}
}
"ICE_CANDIDATE" => {
if let Ok(ice_msg) = serde_json::from_str::<p::IceCandidate>(&payload) {
gst::log!(
CAT,
"Consumer {} got candidate {} for m_line {} and mid {}",
msg.sender_client_id,
ice_msg.candidate,
ice_msg.sdp_m_line_index,
ice_msg.sdp_mid
);
if let Err(err) = element.handle_ice(
&msg.sender_client_id,
Some(ice_msg.sdp_m_line_index),
Some(ice_msg.sdp_mid),
&ice_msg.candidate,
) {
gst::warning!(CAT, obj: element, "{err}");
}
} else {
gst::warning!(
CAT,
obj: element,
"Failed to parse ICE_CANDIDATE: {payload}"
);
}
}
_ => {
gst::log!(
CAT,
obj: element,
"Ignoring unsupported message type {}",
msg.message_type
);
}
}
}
Err(e) => {
gst::error!(
CAT,
obj: element,
"Failed to decode message payload from server: {e}"
);
element.handle_signalling_error(
anyhow!("Failed to decode message payload from server: {e}").into(),
);
}
}
} else {
gst::log!(CAT, obj: element, "Unknown message from server: [{msg}]");
}
}
async fn connect(&self, element: &WebRTCSink) -> Result<(), Error> {
let settings = self.settings.lock().unwrap().clone();
let connector = if let Some(path) = settings.cafile {
let cert = tokio::fs::read_to_string(&path).await?;
let cert = tokio_native_tls::native_tls::Certificate::from_pem(cert.as_bytes())?;
let mut connector_builder = tokio_native_tls::native_tls::TlsConnector::builder();
let connector = connector_builder.add_root_certificate(cert).build()?;
Some(tokio_native_tls::TlsConnector::from(connector))
} else {
None
};
let region = Region::new(DEFAULT_AWS_REGION);
let access_key = settings.access_key.as_ref();
let secret_access_key = settings.secret_access_key.as_ref();
let session_token = settings.session_token.clone();
let credentials = match (access_key, secret_access_key) {
(Some(key), Some(secret_key)) => {
gst::debug!(
CAT,
imp: self,
"Using provided access and secret access key"
);
Ok(Credentials::new(
key.clone(),
secret_key.clone(),
session_token,
None,
"kvs",
))
}
_ => {
gst::debug!(CAT, imp: self, "Using default AWS credentials");
let cred = DefaultCredentialsChain::builder()
.region(region.clone())
.build()
.await;
cred.provide_credentials().await
}
};
let credentials = match credentials {
Err(e) => {
anyhow::bail!("Failed to retrieve credentials with error {e}");
}
Ok(credentials) => credentials,
};
let Some(channel_name) = settings.channel_name else { anyhow::bail!("Channel name cannot be None!"); };
let client = Client::new(
&aws_config::from_env()
.credentials_provider(credentials.clone())
.load()
.await,
);
let resp = client
.describe_signaling_channel()
.set_channel_name(Some(channel_name.clone()))
.send()
.await?;
let Some(cinfo) = resp.channel_info() else { anyhow::bail!("No description found for {channel_name}"); };
gst::debug!(CAT, "Channel description: {cinfo:?}");
let Some(channel_arn) = cinfo.channel_arn() else { anyhow::bail!("No channel ARN found for {channel_name}"); };
let config = SingleMasterChannelEndpointConfiguration::builder()
.set_protocols(Some(vec![ChannelProtocol::Wss, ChannelProtocol::Https]))
.set_role(Some(ChannelRole::Master))
.build();
let resp = client
.get_signaling_channel_endpoint()
.set_channel_arn(Some(channel_arn.to_string()))
.set_single_master_channel_endpoint_configuration(Some(config))
.send()
.await?;
gst::debug!(CAT, "Endpoints: {:?}", resp.resource_endpoint_list());
let endpoint_wss_uri =
match resp
.resource_endpoint_list()
.unwrap()
.iter()
.find_map(|endpoint| {
if endpoint.protocol == Some(ChannelProtocol::Wss) {
Some(endpoint.resource_endpoint().unwrap().to_owned())
} else {
None
}
}) {
Some(endpoint_uri_str) => Uri::from_maybe_shared(endpoint_uri_str).unwrap(),
None => {
anyhow::bail!("No WSS endpoint found for {channel_name}");
}
};
let endpoint_https_uri =
match resp
.resource_endpoint_list()
.unwrap()
.iter()
.find_map(|endpoint| {
if endpoint.protocol == Some(ChannelProtocol::Https) {
Some(endpoint.resource_endpoint().unwrap().to_owned())
} else {
None
}
}) {
Some(endpoint_uri_str) => Uri::from_maybe_shared(endpoint_uri_str).unwrap(),
None => {
anyhow::bail!("No HTTPS endpoint found for {channel_name}");
}
};
gst::debug!(
CAT,
"Endpoints: {:?} {:?}",
endpoint_wss_uri,
endpoint_https_uri
);
let signaling_config = aws_sdk_kinesisvideosignaling::config::Builder::from(
&aws_config::from_env()
.credentials_provider(credentials.clone())
.load()
.await,
)
.endpoint_resolver(Endpoint::immutable_uri(endpoint_https_uri.clone())?)
.build();
let signaling_client = SignalingClient::from_conf(signaling_config);
let resp = signaling_client
.get_ice_server_config()
.set_channel_arn(Some(channel_arn.to_string()))
.send()
.await?;
let ice_servers: Vec<String> = resp
.ice_server_list()
.unwrap()
.iter()
.filter_map(|server| {
Option::zip(server.username(), server.password())
.map(|(username, password)| (username, password, server))
})
.flat_map(|(username, password, server)| {
server
.uris()
.unwrap()
.iter()
.filter_map(move |uri| {
uri.split_once(':').map(|(protocol, host)| {
let (timestamp, username) = username.split_once(':').unwrap();
format!("{protocol}://{timestamp}%3A{encoded_user_name}:{encoded_password}@{host}",
encoded_user_name=url_escape::encode_userinfo(username),
encoded_password=url_escape::encode_userinfo(password),
)
})
})
})
.collect();
gst::info!(CAT, "Ice servers: {:?}", ice_servers);
element.connect_closure(
"consumer-added",
false,
glib::closure!(|_webrtcsink: &gst::Element,
_consumer_identifier: &str,
webrtcbin: &gst::Element| {
webrtcbin.set_property(
"stun-server",
format!("stun://stun.kinesisvideo.{DEFAULT_AWS_REGION}.amazonaws.com:443"),
);
for ice_server in &ice_servers {
let res = webrtcbin.emit_by_name::<bool>("add-turn-server", &[&ice_server]);
gst::debug!(CAT, "Added ICE server {ice_server}, res: {res}");
}
}),
);
let current_time = Utc::now();
let signer = signer::SigV4Signer::new();
let mut operation_config = OperationSigningConfig::default_config();
operation_config.signature_type = HttpSignatureType::HttpRequestQueryParams;
operation_config.expires_in = Some(Duration::from_secs(5 * 60)); // See commit a3db85d.
let request_config = RequestConfig {
request_ts: SystemTime::from(current_time),
region: &SigningRegion::from(region.clone()),
service: &SigningService::from_static("kinesisvideo"),
payload_override: None,
};
let transcribe_uri = Uri::builder()
.scheme("wss")
.authority(endpoint_wss_uri.authority().unwrap().to_owned())
.path_and_query(format!(
"/?X-Amz-ChannelARN={}",
aws_smithy_http::query::fmt_string(channel_arn)
))
.build()
.map_err(|err| {
gst::error!(CAT, imp: self, "Failed to build HTTP request URI: {err}");
anyhow!("Failed to build HTTP request URI: {err}")
})?;
let mut request = http::Request::builder()
.uri(transcribe_uri)
.body(SdkBody::empty())
.expect("Failed to build valid request");
let _signature = signer
.sign(
&operation_config,
&request_config,
&credentials,
&mut request,
)
.map_err(|err| {
gst::error!(CAT, imp: self, "Failed to sign HTTP request: {err}");
anyhow!("Failed to sign HTTP request: {err}")
})?;
let url = request.uri().to_string();
gst::debug!(CAT, "Signed URL: {url}");
let (ws, _) =
async_tungstenite::tokio::connect_async_with_tls_connector(url, connector).await?;
gst::info!(CAT, obj: element, "connected");
// Channel for asynchronously sending out websocket message
let (mut ws_sink, mut ws_stream) = ws.split();
// 1000 is completely arbitrary, we simply don't want infinite piling
// up of messages as with unbounded
let (mut _websocket_sender, mut websocket_receiver) =
mpsc::channel::<p::OutgoingMessage>(1000);
let element_clone = element.downgrade();
let ping_timeout = settings.ping_timeout;
let send_task_handle = task::spawn(async move {
loop {
match tokio::time::timeout(
std::time::Duration::from_secs(ping_timeout as u64),
websocket_receiver.next(),
)
.await
{
Ok(Some(msg)) => {
if let Some(element) = element_clone.upgrade() {
gst::trace!(
CAT,
obj: element,
"Sending websocket message {}",
serde_json::to_string(&msg).unwrap()
);
}
ws_sink
.send(WsMessage::Text(serde_json::to_string(&msg).unwrap()))
.await?;
}
Ok(None) => {
break;
}
Err(_) => {
ws_sink.send(WsMessage::Ping(vec![])).await?;
}
}
}
if let Some(element) = element_clone.upgrade() {
gst::info!(CAT, obj: element, "Done sending");
}
ws_sink.send(WsMessage::Close(None)).await?;
ws_sink.close().await?;
Ok::<(), Error>(())
});
let element_clone = element.downgrade();
let receive_task_handle = task::spawn(async move {
while let Some(msg) = tokio_stream::StreamExt::next(&mut ws_stream).await {
if let Some(element) = element_clone.upgrade() {
match msg {
Ok(WsMessage::Text(msg)) => {
gst::trace!(CAT, "received message [{msg}]");
Signaller::handle_message(&element, msg);
}
Ok(WsMessage::Close(reason)) => {
gst::info!(
CAT,
obj: element,
"websocket connection closed: {:?}",
reason
);
element.shutdown();
break;
}
Ok(_) => (),
Err(err) => {
element
.handle_signalling_error(anyhow!("Error receiving: {err}").into());
break;
}
}
} else {
break;
}
}
if let Some(element) = element_clone.upgrade() {
gst::info!(CAT, obj: element, "Stopped websocket receiving");
}
});
let mut state = self.state.lock().unwrap();
state.websocket_sender = Some(_websocket_sender);
state.send_task_handle = Some(send_task_handle);
state.receive_task_handle = Some(receive_task_handle);
Ok(())
}
pub fn start(&self, element: &WebRTCSink) {
let this = self.obj().clone();
let element_clone = element.clone();
task::spawn(async move {
let this = this.imp();
if let Err(err) = this.connect(&element_clone).await {
element_clone.handle_signalling_error(err.into());
}
});
}
pub fn handle_sdp(
&self,
element: &WebRTCSink,
session_id: &str,
sdp: &gst_webrtc::WebRTCSessionDescription,
) {
let state = self.state.lock().unwrap();
let msg = p::OutgoingMessage {
action: "SDP_ANSWER".to_string(),
message_payload: BASE64.encode(
&serde_json::to_string(&p::SdpAnswer {
type_: "answer".to_string(),
sdp: sdp.sdp().as_text().unwrap(),
})
.unwrap()
.into_bytes(),
),
recipient_client_id: session_id.to_string(),
};
if let Some(mut sender) = state.websocket_sender.clone() {
let element = element.downgrade();
RUNTIME.spawn(async move {
if let Err(err) = sender.send(msg).await {
if let Some(element) = element.upgrade() {
element.handle_signalling_error(anyhow!("Error: {err}").into());
}
}
});
}
}
pub fn handle_ice(
&self,
element: &WebRTCSink,
session_id: &str,
candidate: &str,
sdp_m_line_index: Option<u32>,
_sdp_mid: Option<String>,
) {
let state = self.state.lock().unwrap();
let msg = p::OutgoingMessage {
action: "ICE_CANDIDATE".to_string(),
message_payload: BASE64.encode(
&serde_json::to_string(&p::OutgoingIceCandidate {
candidate: candidate.to_string(),
sdp_mid: sdp_m_line_index.unwrap().to_string(),
sdp_m_line_index: sdp_m_line_index.unwrap(),
})
.unwrap()
.into_bytes(),
),
recipient_client_id: session_id.to_string(),
};
if let Some(mut sender) = state.websocket_sender.clone() {
let element = element.downgrade();
RUNTIME.spawn(async move {
if let Err(err) = sender.send(msg).await {
if let Some(element) = element.upgrade() {
element.handle_signalling_error(anyhow!("Error: {err}").into());
}
}
});
}
}
pub fn stop(&self, element: &WebRTCSink) {
gst::info!(CAT, obj: element, "Stopping now");
let mut state = self.state.lock().unwrap();
let send_task_handle = state.send_task_handle.take();
let receive_task_handle = state.receive_task_handle.take();
if let Some(mut sender) = state.websocket_sender.take() {
let element = element.downgrade();
RUNTIME.block_on(async move {
sender.close_channel();
if let Some(handle) = send_task_handle {
if let Err(err) = handle.await {
if let Some(element) = element.upgrade() {
gst::warning!(
CAT,
obj: element,
"Error while joining send task: {err}"
);
}
}
}
if let Some(handle) = receive_task_handle {
if let Err(err) = handle.await {
if let Some(element) = element.upgrade() {
gst::warning!(
CAT,
obj: element,
"Error while joining receive task: {err}"
);
}
}
}
});
}
}
pub fn end_session(&self, element: &WebRTCSink, session_id: &str) {
gst::info!(CAT, obj: element, "Signalling session {session_id} ended");
// We can seemingly not do anything beyond that
}
}
#[glib::object_subclass]
impl ObjectSubclass for Signaller {
const NAME: &'static str = "GstAwsKvsWebRTCSinkSignaller";
type Type = super::AwsKvsSignaller;
type ParentType = glib::Object;
}
impl ObjectImpl for Signaller {
fn properties() -> &'static [glib::ParamSpec] {
static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| {
vec![
glib::ParamSpecString::builder("address")
.nick("Address")
.blurb("Address of the signalling server")
.default_value("ws://127.0.0.1:8443")
.build(),
glib::ParamSpecString::builder("cafile")
.nick("CA file")
.blurb("Path to a Certificate file to add to the set of roots the TLS connector will trust")
.build(),
glib::ParamSpecString::builder("channel-name")
.nick("Channel name")
.blurb("Name of the channel to connect as master to")
.build(),
glib::ParamSpecInt::builder("ping-timeout")
.nick("Ping Timeout")
.blurb("How often (in seconds) to send pings to keep the websocket alive")
.default_value(DEFAULT_PING_TIMEOUT)
.minimum(1)
.build(),
]
});
PROPERTIES.as_ref()
}
fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) {
match pspec.name() {
"address" => {
let address: Option<_> = value.get().expect("type checked upstream");
if let Some(address) = address {
gst::info!(CAT, "Signaller address set to {address}");
let mut settings = self.settings.lock().unwrap();
settings.address = Some(address);
} else {
gst::error!(CAT, "address can't be None");
}
}
"cafile" => {
let value: String = value.get().unwrap();
let mut settings = self.settings.lock().unwrap();
settings.cafile = Some(value.into());
}
"access-key" => {
let mut settings = self.settings.lock().unwrap();
settings.access_key = value.get().expect("type checked upstream");
}
"secret-access-key" => {
let mut settings = self.settings.lock().unwrap();
settings.secret_access_key = value.get().expect("type checked upstream");
}
"session-token" => {
let mut settings = self.settings.lock().unwrap();
settings.session_token = value.get().expect("type checked upstream");
}
"channel-name" => {
let mut settings = self.settings.lock().unwrap();
settings.channel_name = value.get().expect("type checked upstream");
}
"ping-timeout" => {
let mut settings = self.settings.lock().unwrap();
settings.ping_timeout = value.get().expect("type checked upstream");
}
_ => unimplemented!(),
}
}
fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
match pspec.name() {
"address" => self.settings.lock().unwrap().address.to_value(),
"cafile" => {
let settings = self.settings.lock().unwrap();
let cafile = settings.cafile.as_ref();
cafile.and_then(|file| file.to_str()).to_value()
}
"access-key" => {
let settings = self.settings.lock().unwrap();
settings.access_key.to_value()
}
"secret-access-key" => {
let settings = self.settings.lock().unwrap();
settings.secret_access_key.to_value()
}
"session-token" => {
let settings = self.settings.lock().unwrap();
settings.session_token.to_value()
}
"channel-name" => self.settings.lock().unwrap().channel_name.to_value(),
"ping-timeout" => self.settings.lock().unwrap().ping_timeout.to_value(),
_ => unimplemented!(),
}
}
}

View file

@ -0,0 +1,65 @@
// SPDX-License-Identifier: MPL-2.0
use crate::webrtcsink::{Signallable, WebRTCSink};
use gst::glib;
use gst::subclass::prelude::*;
use std::error::Error;
mod imp;
mod protocol;
glib::wrapper! {
pub struct AwsKvsSignaller(ObjectSubclass<imp::Signaller>);
}
unsafe impl Send for AwsKvsSignaller {}
unsafe impl Sync for AwsKvsSignaller {}
impl Signallable for AwsKvsSignaller {
fn start(&mut self, element: &WebRTCSink) -> Result<(), Box<dyn Error>> {
let signaller = self.imp();
signaller.start(element);
Ok(())
}
fn handle_sdp(
&mut self,
element: &WebRTCSink,
peer_id: &str,
sdp: &gst_webrtc::WebRTCSessionDescription,
) -> Result<(), Box<dyn Error>> {
let signaller = self.imp();
signaller.handle_sdp(element, peer_id, sdp);
Ok(())
}
fn handle_ice(
&mut self,
element: &WebRTCSink,
session_id: &str,
candidate: &str,
sdp_mline_index: Option<u32>,
sdp_mid: Option<String>,
) -> Result<(), Box<dyn Error>> {
let signaller = self.imp();
signaller.handle_ice(element, session_id, candidate, sdp_mline_index, sdp_mid);
Ok(())
}
fn stop(&mut self, element: &WebRTCSink) {
let signaller = self.imp();
signaller.stop(element);
}
fn session_ended(&mut self, element: &WebRTCSink, session_id: &str) {
let signaller = self.imp();
signaller.end_session(element, session_id);
}
}
impl Default for AwsKvsSignaller {
fn default() -> Self {
glib::Object::new()
}
}

View file

@ -0,0 +1,53 @@
// SPDX-License-Identifier: MPL-2.0
/// The default protocol used by the signalling server
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct SdpOffer {
#[serde(rename = "type")]
pub type_: String,
pub sdp: String,
}
#[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct IceCandidate {
pub candidate: String,
pub sdp_mid: String,
pub sdp_m_line_index: u32,
pub username_fragment: Option<String>,
}
#[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct IncomingMessage {
pub message_type: String,
pub message_payload: String,
pub sender_client_id: String,
}
#[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct SdpAnswer {
#[serde(rename = "type")]
pub type_: String,
pub sdp: String,
}
#[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct OutgoingIceCandidate {
pub candidate: String,
pub sdp_mid: String,
pub sdp_m_line_index: u32,
}
#[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct OutgoingMessage {
pub action: String,
pub message_payload: String,
pub recipient_client_id: String,
}

View file

@ -10,6 +10,7 @@ use gst::glib;
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use tokio::runtime; use tokio::runtime;
mod aws_kvs_signaller;
mod signaller; mod signaller;
pub mod utils; pub mod utils;
pub mod webrtcsink; pub mod webrtcsink;

View file

@ -139,7 +139,7 @@ impl Signaller {
peer_id, peer_id,
} => { } => {
if let Err(err) = if let Err(err) =
element.start_session(&session_id, &peer_id) element.start_session(&session_id, &peer_id, None)
{ {
gst::warning!(CAT, obj: element, "{}", err); gst::warning!(CAT, obj: element, "{}", err);
} }

File diff suppressed because it is too large Load diff

View file

@ -18,8 +18,9 @@ glib::wrapper! {
pub struct WebRTCSink(ObjectSubclass<imp::WebRTCSink>) @extends gst::Bin, gst::Element, gst::Object, @implements gst::ChildProxy, gst_video::Navigation; pub struct WebRTCSink(ObjectSubclass<imp::WebRTCSink>) @extends gst::Bin, gst::Element, gst::Object, @implements gst::ChildProxy, gst_video::Navigation;
} }
unsafe impl Send for WebRTCSink {} glib::wrapper! {
unsafe impl Sync for WebRTCSink {} pub struct AwsKvsWebRTCSink(ObjectSubclass<imp::AwsKvsWebRTCSink>) @extends WebRTCSink, gst::Bin, gst::Element, gst::Object, @implements gst::ChildProxy, gst_video::Navigation;
}
#[derive(thiserror::Error, Debug)] #[derive(thiserror::Error, Debug)]
pub enum WebRTCSinkError { pub enum WebRTCSinkError {
@ -79,12 +80,6 @@ pub trait SignallableObject: AsRef<glib::Object> + Signallable {}
impl<T: AsRef<glib::Object> + Signallable> SignallableObject for T {} impl<T: AsRef<glib::Object> + Signallable> SignallableObject for T {}
impl Default for WebRTCSink {
fn default() -> Self {
glib::Object::new()
}
}
impl WebRTCSink { impl WebRTCSink {
pub fn with_signaller(signaller: Box<dyn SignallableObject>) -> Self { pub fn with_signaller(signaller: Box<dyn SignallableObject>) -> Self {
let ret = glib::Object::new::<WebRTCSink>(); let ret = glib::Object::new::<WebRTCSink>();
@ -123,9 +118,19 @@ impl WebRTCSink {
ws.handle_signalling_error(self, anyhow::anyhow!(error)); ws.handle_signalling_error(self, anyhow::anyhow!(error));
} }
pub fn start_session(&self, session_id: &str, peer_id: &str) -> Result<(), WebRTCSinkError> { pub fn shutdown(&self) {
let ws = self.imp(); let ws = self.imp();
ws.start_session(self, session_id, peer_id) ws.shutdown(self);
}
pub fn start_session(
&self,
session_id: &str,
peer_id: &str,
offer: Option<&gst_webrtc::WebRTCSessionDescription>,
) -> Result<(), WebRTCSinkError> {
let ws = self.imp();
ws.start_session(self, session_id, peer_id, offer)
} }
pub fn end_session(&self, session_id: &str) -> Result<(), WebRTCSinkError> { pub fn end_session(&self, session_id: &str) -> Result<(), WebRTCSinkError> {
@ -163,5 +168,13 @@ pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
"webrtcsink", "webrtcsink",
gst::Rank::None, gst::Rank::None,
WebRTCSink::static_type(), WebRTCSink::static_type(),
) )?;
gst::Element::register(
Some(plugin),
"awskvswebrtcsink",
gst::Rank::None,
AwsKvsWebRTCSink::static_type(),
)?;
Ok(())
} }