Merge 'webrtcsink' from 020c7e2900

This commit is contained in:
Thibault Saunier 2022-10-20 11:51:58 +02:00
commit eb9d0bb824
51 changed files with 16369 additions and 0 deletions

net/webrtc/Cargo.lock generated Normal file

File diff suppressed because it is too large Load diff

net/webrtc/Cargo.toml Normal file
View file

@ -0,0 +1,7 @@
members = [

net/webrtc/LICENSE Normal file
View file

net/webrtc/ Normal file
View file

@ -0,0 +1,224 @@
# webrtcsink
All-batteries included GStreamer WebRTC producer, that tries its best to do The Right Thing™.
## Use case
The [webrtcbin] element in GStreamer is extremely flexible and powerful, but using
it can be a difficult exercise. When all you want to do is serve a fixed set of streams
to any number of consumers, `webrtcsink` (which wraps `webrtcbin` internally) can be a
useful alternative.
## Features
`webrtcsink` implements the following features:
* Built-in signaller: when using the default signalling server, this element will
perform signalling without requiring application interaction.
This makes it usable directly from `gst-launch`.
* Application-provided signalling: `webrtcsink` can be instantiated by an application
with a custom signaller. That signaller must be a GObject, and must implement the
`Signallable` interface as defined [here](plugins/src/webrtcsink/ The
[default signaller](plugins/src/signaller/ can be used as an example.
An [example project] is also available to use as a boilerplate for implementing
and using a custom signaller.
* Sandboxed consumers: when a consumer is added, its encoder / payloader / webrtcbin
elements run in a separately managed pipeline. This provides a certain level of
sandboxing, as opposed to having those elements running inside the element itself.
It is important to note that at this moment, encoding is not shared between consumers.
While this is not on the roadmap at the moment, nothing in the design prevents
implementing this optimization.
* Congestion control: the element leverages transport-wide congestion control
feedback messages in order to adapt the bitrate of individual consumers' video
encoders to the available bandwidth.
* Configuration: the level of user control over the element is slowly expanding,
consult `gst-inspect-1.0` for more information on the available properties and
* Packet loss mitigation: webrtcsink now supports sending protection packets for
Forward Error Correction, modulating the amount as a function of the available
bandwidth, and can honor retransmission requests. Both features can be disabled
via properties.
It is important to note that full control over the individual elements used by
`webrtcsink` is *not* on the roadmap, as it will act as a black box in that respect,
for example `webrtcsink` wants to reserve control over the bitrate for congestion
A signal is now available however for the application to provide the initial
configuration for the encoders `webrtcsink` instantiates.
If more granular control is required, applications should use `webrtcbin` directly,
`webrtcsink` will focus on trying to just do the right thing, although it might
expose more interfaces to guide and tune the heuristics it employs.
[example project]:
## Building
### Prerequisites
The element has only been tested for now against GStreamer main.
For testing, it is recommended to simply build GStreamer locally and run
in the uninstalled devenv.
> Make sure to install the development packages for some codec libraries
> beforehand, such as libx264, libvpx and libopusenc, exact names depend
> on your distribution.
git clone --depth 1 --single-branch --branch main
cd gstreamer
meson build
ninja -C build
ninja -C build devenv
### Compiling
``` shell
cargo build
## Usage
Open three terminals. In the first, run:
``` shell
WEBRTCSINK_SIGNALLING_SERVER_LOG=debug cargo run --bin server
In the second, run:
``` shell
python3 -m http.server -d www/
In the third, run:
``` shell
gst-launch-1.0 webrtcsink name=ws videotestsrc ! ws. audiotestsrc ! ws.
When the pipeline above is running succesfully, open a browser and
point it to the http server:
``` shell
You should see an identifier listed in the left-hand panel, click on
it. You should see a test video stream, and hear a test tone.
## Configuration
The element itself can be configured through its properties, see
`gst-inspect-1.0 webrtcsink` for more information about that, in addition the
default signaller also exposes properties for configuring it, in
particular setting the signalling server address, those properties
can be accessed through the `gst::ChildProxy` interface, for example
with gst-launch:
``` shell
gst-launch-1.0 webrtcsink signaller::address="ws://" ..
The signaller object can not be inspected, refer to [the source code]
for the list of properties.
[the source code]: plugins/src/signaller/
### Enable 'navigation' a.k.a user interactivity with the content
`webrtcsink` implements the [`GstNavigation`] interface which allows interacting
with the content, for example move with your mouse, entering keys with the
keyboard, etc... On top of that a `WebRTCDataChannel` based protocol has been
implemented and can be activated with the `enable-data-channel-navigation=true`
property. The [demo](www/) implements the protocol and you can easily test this
feature, using the [`wpesrc`] for example.
As an example, the following pipeline allows you to navigate the GStreamer
documentation inside the video running within your web browser (in if you followed previous steps of that readme):
gst-launch-1.0 wpesrc location= ! webrtcsink enable-data-channel-navigation=true
## Testing congestion control
For the purpose of testing congestion in a reproducible manner, a
[simple tool] has been used, I only used it on Linux but it is documented
as usable on MacOS too. I had to run the client browser on a separate
machine on my local network for congestion to actually be applied, I didn't
look into why that was necessary.
My testing procedure was:
* identify the server machine network interface (eg with `ifconfig` on Linux)
* identify the client machine IP address (eg with `ifconfig` on Linux)
* start the various services as explained in the Usage section (use
`GST_DEBUG=webrtcsink:7` to get detailed logs about congestion control)
* start playback in the client browser
* Run a `comcast` command on the server machine, for instance:
``` shell
/home/meh/go/bin/comcast --device=$SERVER_INTERFACE --target-bw 3000 --target-addr=$CLIENT_IP --target-port=1:65535 --target-proto=udp
* Observe the bitrate sharply decreasing, playback should slow down briefly
then catch back up
* Remove the bandwidth limitation, and observe the bitrate eventually increasing
back to a maximum:
``` shell
/home/meh/go/bin/comcast --device=$SERVER_INTERFACE --stop
For comparison, the congestion control property can be set to disabled on
webrtcsink, then the above procedure applied again, the expected result is
for playback to simply crawl down to a halt until the bandwidth limitation
is lifted:
``` shell
gst-launch-1.0 webrtcsink congestion-control=disabled
[simple tool]:
## Monitoring tool
An example server / client application for monitoring per-consumer stats
can be found [here].
[here]: plugins/examples/
## License
All the rust code in this repository is licensed under the [Mozilla Public License Version 2.0].
Parts of the JavaScript code in the www/ example are licensed under the [Apache License, Version 2.0],
the rest is licensed under the [Mozilla Public License Version 2.0] unless advertised in the
[Mozilla Public License Version 2.0]:
[Apache License, Version 2.0]:

View file

@ -0,0 +1,68 @@
name = "webrtcsink"
version = "0.1.0"
edition = "2018"
authors = ["Mathieu Duponchelle <>"]
license = "MPL-2.0"
description = "GStreamer WebRTC sink"
repository = ""
build = ""
gst = { git="", package = "gstreamer", features = ["v1_20", "serde"] }
gst-app = { git="", package = "gstreamer-app", features = ["v1_20"] }
gst-video = { git="", package = "gstreamer-video", features = ["v1_20", "serde"] }
gst-webrtc = { git="", package = "gstreamer-webrtc", features = ["v1_20"] }
gst-sdp = { git="", package = "gstreamer-sdp", features = ["v1_20"] }
gst-rtp = { git="", package = "gstreamer-rtp", features = ["v1_20"] }
gst-utils = { git="", package = "gstreamer-utils" }
once_cell = "1.0"
chrono = { version = "0.4", default-features = false }
smallvec = "1"
anyhow = "1"
thiserror = "1"
futures = "0.3"
async-std = { version = "1", features = ["unstable"] }
async-native-tls = { version = "0.4.0" }
async-tungstenite = { version = "0.17", features = ["async-std-runtime", "async-native-tls"] }
serde = "1"
serde_json = "1"
fastrand = "1.0"
webrtcsink-protocol = { version = "0.1", path="../protocol" }
human_bytes = "0.3.1"
tracing = { version = "0.1", features = ["log"] }
tracing-subscriber = { version = "0.3", features = ["registry", "env-filter"] }
tracing-log = "0.1"
uuid = { version = "1", features = ["v4"] }
clap = { version = "4", features = ["derive"] }
name = "webrtcsink"
crate-type = ["cdylib", "rlib"]
path = "src/"
gst-plugin-version-helper = "0.7"
static = []
capi = []
gst1_22 = ["gst/v1_22", "gst-app/v1_22", "gst-video/v1_22", "gst-webrtc/v1_22", "gst-sdp/v1_22", "gst-rtp/v1_22"]
min_version = "0.8.0"
enabled = false
install_subdir = "gstreamer-1.0"
versioning = false
requires_private = "gstreamer-rtp >= 1.20, gstreamer-webrtc >= 1.20, gstreamer-1.0 >= 1.20, gstreamer-app >= 1.20, gstreamer-video >= 1.20, gstreamer-sdp >= 1.20, gobject-2.0, glib-2.0, gmodule-2.0"
name = "webrtcsink-stats-server"

View file

@ -0,0 +1,3 @@
fn main() {

View file

@ -0,0 +1,18 @@
# webrtcsink examples
Collection (1-sized for now) of webrtcsink examples
## webrtcsink-stats-server
A simple application that instantiates a webrtcsink and serves stats
over websockets.
The application expects a signalling server to be running at `ws://localhost:8443`,
similar to the usage example in the main README.
``` shell
cargo run --example webrtcsink-stats-server
Once it is running, follow the instruction in the webrtcsink-stats folder to
run an example client.

View file

@ -0,0 +1,235 @@
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use anyhow::Error;
use async_std::net::{TcpListener, TcpStream};
use async_std::task;
use async_tungstenite::tungstenite::Message as WsMessage;
use clap::Parser;
use futures::channel::mpsc;
use futures::prelude::*;
use gst::glib::Type;
use gst::prelude::*;
use tracing::{debug, info, trace};
use tracing_subscriber::prelude::*;
#[derive(Parser, Debug)]
#[clap(about, version, author)]
/// Program arguments
struct Args {
/// URI of file to serve. Must hold at least one audio and video stream
uri: String,
/// Disable Forward Error Correction
disable_fec: bool,
/// Disable retransmission
disable_retransmission: bool,
/// Disable congestion control
disable_congestion_control: bool,
fn serialize_value(val: &gst::glib::Value) -> Option<serde_json::Value> {
match val.type_() {
Type::STRING => Some(val.get::<String>().unwrap().into()),
Type::BOOL => Some(val.get::<bool>().unwrap().into()),
Type::I32 => Some(val.get::<i32>().unwrap().into()),
Type::U32 => Some(val.get::<u32>().unwrap().into()),
Type::I_LONG | Type::I64 => Some(val.get::<i64>().unwrap().into()),
Type::U_LONG | Type::U64 => Some(val.get::<u64>().unwrap().into()),
Type::F32 => Some(val.get::<f32>().unwrap().into()),
Type::F64 => Some(val.get::<f64>().unwrap().into()),
_ => {
if let Ok(s) = val.get::<gst::Structure>() {
.filter_map(|(name, value)| {
serialize_value(value).map(|value| (name.to_string(), value))
.collect::<HashMap<String, serde_json::Value>>(),
} else if let Ok(a) = val.get::<gst::Array>() {
.filter_map(|value| serialize_value(value))
} else if let Some((_klass, values)) = gst::glib::FlagsValue::from_value(val) {
.map(|value| value.nick())
} else if let Ok(value) = val.serialize() {
} else {
struct Listener {
id: uuid::Uuid,
sender: mpsc::Sender<WsMessage>,
struct State {
listeners: Vec<Listener>,
async fn run(args: Args) -> Result<(), Error> {
tracing_log::LogTracer::init().expect("Failed to set logger");
let env_filter = tracing_subscriber::EnvFilter::try_from_env("WEBRTCSINK_STATS_LOG")
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info"));
let fmt_layer = tracing_subscriber::fmt::layer()
| tracing_subscriber::fmt::format::FmtSpan::CLOSE,
let subscriber = tracing_subscriber::Registry::default()
tracing::subscriber::set_global_default(subscriber).expect("Failed to set subscriber");
let state = Arc::new(Mutex::new(State { listeners: vec![] }));
let addr = "".to_string();
// Create the event loop and TCP listener we'll accept connections on.
let try_socket = TcpListener::bind(&addr).await;
let listener = try_socket.expect("Failed to bind");
info!("Listening on: {}", addr);
let pipeline_str = format!(
"webrtcsink name=ws do-retransmission={} do-fec={} congestion-control={} \
uridecodebin name=d uri={} \
d. ! video/x-raw ! queue ! ws.video_0 \
d. ! audio/x-raw ! queue ! ws.audio_0",
if args.disable_congestion_control {
} else {
let pipeline = gst::parse_launch(&pipeline_str)?;
let ws = pipeline
ws.connect("encoder-setup", false, |values| {
let encoder = values[3].get::<gst::Element>().unwrap();
info!("Encoder: {}", encoder.factory().unwrap().name());
let configured = if let Some(factory) = encoder.factory() {
match {
"does-not-exist" => {
// One could configure a hardware encoder to their liking here,
// and return true to make sure webrtcsink does not do any configuration
// of its own
_ => false,
} else {
let ws_clone = ws.downgrade();
let state_clone = state.clone();
task::spawn(async move {
let mut interval = async_std::stream::interval(std::time::Duration::from_millis(100));
while {
if let Some(ws) = ws_clone.upgrade() {
let stats =<gst::Structure>("stats");
let stats = serialize_value(&stats.to_value()).unwrap();
debug!("Stats: {}", serde_json::to_string_pretty(&stats).unwrap());
let msg = WsMessage::Text(serde_json::to_string(&stats).unwrap());
let listeners = state_clone.lock().unwrap().listeners.clone();
for mut listener in listeners {
if listener.sender.send(msg.clone()).await.is_err() {
let mut state = state_clone.lock().unwrap();
let index = state
.position(|l| ==
} else {
while let Ok((stream, _)) = listener.accept().await {
task::spawn(accept_connection(state.clone(), stream));
async fn accept_connection(state: Arc<Mutex<State>>, stream: TcpStream) {
let addr = stream
.expect("connected streams should have a peer address");
info!("Peer address: {}", addr);
let mut ws_stream = async_tungstenite::accept_async(stream)
.expect("Error during the websocket handshake occurred");
info!("New WebSocket connection: {}", addr);
let mut state = state.lock().unwrap();
let (sender, mut receiver) = mpsc::channel::<WsMessage>(1000);
state.listeners.push(Listener {
id: uuid::Uuid::new_v4(),
task::spawn(async move {
while let Some(msg) = {
trace!("Sending to one listener!");
if ws_stream.send(msg).await.is_err() {
info!("Listener errored out");
fn main() -> Result<(), Error> {
let args = Args::parse();

View file

@ -0,0 +1,4 @@

View file

@ -0,0 +1,20 @@
# Example web client for webrtcsink-stats-server
This web client will display live statistics as received through a
websocket connected to a `webrtcsink-stats-server`.
``` shell
npm install
npm run dev
Then navigate to `http://localhost:3000/`. Once consumers are connected
to the webrtc-sink-stats-server, they should be listed on the page, clicking
on any consumer will show a modal with plots for some of the most interesting
The stat server can also be specified through the `remote-url` search parameter,
for example you can access a distant stat server with

View file

@ -0,0 +1,13 @@
<!DOCTYPE html>
<html lang="en">
<meta charset="UTF-8" />
<link rel="icon" href="/favicon.ico" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>Svelte + TS + Vite App</title>
<div id="app"></div>
<script type="module" src="/src/main.ts"></script>

View file

View file

@ -0,0 +1,27 @@
"name": "webrtcsink-stats",
"version": "0.0.0",
"type": "module",
"scripts": {
"dev": "vite",
"build": "vite build",
"serve": "vite preview",
"check": "svelte-check --tsconfig ./tsconfig.json"
"devDependencies": {
"@fortawesome/free-solid-svg-icons": "^5.15.4",
"@sveltejs/vite-plugin-svelte": "^1.0.0-next.11",
"@tsconfig/svelte": "^2.0.1",
"sass": "^1.43.5",
"svelte": "^3.37.0",
"svelte-check": "^2.1.0",
"svelte-fa": "^2.4.0",
"svelte-preprocess": "^4.7.2",
"tslib": "^2.2.0",
"typescript": "^4.3.2",
"vite": "^2.6.4"
"dependencies": {
"moment": "^2.29.1"

Binary file not shown.


Width:  |  Height:  |  Size: 1.1 KiB

View file

@ -0,0 +1,182 @@
<script src="" type="text/javascript"></script>
<script lang="ts">
import Home from '@/pages/Home.svelte'
import Header from '@/components/Header.svelte'
import type { ConsumerType } from '@/types/app'
import { WebSocketStatus, MitigationMode } from '@/types/app'
import { onMount, onDestroy } from 'svelte';
let ws: WebSocket | undefined = undefined
let websocketStatus: WebSocketStatus = WebSocketStatus.Connecting
let consumers: Map<string, ConsumerType> = new Map ()
let consumers_array: Array<ConsumerType> = []
let timeout: ReturnType<typeof setTimeout> | undefined = undefined
const updateConsumerStats = (consumer: ConsumerType, stats: Object) => {
let target_bitrate = 0
let fec_percentage = 0
let keyframe_requests = 0
let retransmission_requests = 0
let bitrate_sent = 0
let bitrate_recv = 0
let packet_loss = 0
let delta_of_delta = 0
if (stats["consumer-stats"]["video-encoders"].length > 0) {
let venc = stats["consumer-stats"]["video-encoders"][0]
target_bitrate = venc["bitrate"]
fec_percentage = venc["fec-percentage"]
consumer.video_codec = venc["codec-name"]
let mitigation_mode = MitigationMode.None
for (let mode of venc["mitigation-mode"].split("+")) {
switch (mode) {
case "none": {
mitigation_mode |= MitigationMode.None
case "downscaled": {
mitigation_mode |= MitigationMode.Downscaled
case "downsampled": {
mitigation_mode |= MitigationMode.Downsampled
consumer.mitigation_mode = mitigation_mode
for (let svalue of Object.values(stats)) {
if (svalue["type"] == "transport") {
let twcc_stats = svalue["gst-twcc-stats"]
if (twcc_stats !== undefined) {
bitrate_sent = twcc_stats["bitrate-sent"]
bitrate_recv = twcc_stats["bitrate-recv"]
packet_loss = twcc_stats["packet-loss-pct"]
delta_of_delta = twcc_stats["avg-delta-of-delta"]
} else if (svalue["type"] == "outbound-rtp") {
keyframe_requests += svalue["pli-count"]
retransmission_requests += svalue["nack-count"]
consumer.stats["target_bitrate"] = target_bitrate
consumer.stats["fec_percentage"] = fec_percentage
consumer.stats["bitrate_sent"] = bitrate_sent
consumer.stats["bitrate_recv"] = bitrate_recv
consumer.stats["packet_loss"] = packet_loss
consumer.stats["delta_of_delta"] = delta_of_delta
consumer.stats["keyframe_requests"] = keyframe_requests
consumer.stats["retransmission_requests"] = retransmission_requests
const fetchStats = () => {
const urlParams = new URLSearchParams(;
var remote_server = urlParams.get('remote-url');
if (!remote_server)
remote_server = ""
const ws_url = `ws://${remote_server}`;`Logging to ${ws_url}`);
ws = new WebSocket(ws_url);
ws.onerror = () => {
websocketStatus = WebSocketStatus.Error
ws.onclose = () => {
websocketStatus = WebSocketStatus.Error
consumers = new Map()
consumers_array = []
timeout = setTimeout(fetchStats, 500)
ws.onopen = () => {
websocketStatus = WebSocketStatus.Connected
ws.onmessage = (event) => {
let stats = JSON.parse(
// Set is supposed to be buildable from an iterator,
// no idea why the Arra.from is needed ..
let to_remove = new Set(Array.from(consumers.keys()))
for (let [key, value] of Object.entries(stats)) {
let consumer = undefined;
if (consumers.get(key) === undefined) {
consumer = {
id: key,
video_codec: undefined,
mitigation_mode: MitigationMode.None,
stats: new Map([
["target_bitrate", 0],
["fec_percentage", 0],
["bitrate_sent", 0],
["bitrate_recv", 0],
["packet_loss", 0],
["delta_of_delta", 0],
["keyframe_requests", 0],
["retransmission_requests", 0],
consumers.set(key, consumer)
} else {
consumer = consumers.get(key)
updateConsumerStats(consumer, value)
for (let key of to_remove) {
consumers_array = Array.from(consumers.values())
const closeWebSocket = () => {
if (ws != undefined) {
ws = undefined;
if (timeout != undefined) {
timeout = undefined
<Header websocketStatus={ websocketStatus } />
<Home consumers={ consumers_array } />
<style lang="scss">
:root {
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, Oxygen,
Ubuntu, Cantarell, 'Open Sans', 'Helvetica Neue', sans-serif;
height: 100%;
:global(body) {
/* this will apply to <body> */
margin: 0;
height: 100%;
background-color: #fbfbfb;

Binary file not shown.


Width:  |  Height:  |  Size: 124 KiB

Binary file not shown.


Width:  |  Height:  |  Size: 5.1 KiB

Binary file not shown.


Width:  |  Height:  |  Size: 74 KiB

Binary file not shown.


Width:  |  Height:  |  Size: 70 KiB

View file

@ -0,0 +1,32 @@
<script lang="ts">
import type { ConsumerType } from '@/types/app'
import EncoderProps from '@/components/EncoderProps.svelte'
export let consumer: ConsumerType
$: id =
<div class="consumer-card" on:click >
<div class="id">{id}</div>
<style lang="scss">
.consumer-card {
word-break: break-all;
width: 150px;
height: 100px;
margin-right: 15px;
background-color: #fff;
padding: 15px;
border-radius: 10px;
box-shadow: rgb(0 0 0 / 24%) 0px 3px 8px;
.id {
font-weight: bold;
margin-bottom: 10px;

View file

@ -0,0 +1,53 @@
<script lang="ts">
import type { ConsumerType } from '@/types/app'
import { MitigationMode } from '@/types/app'
import Fa from 'svelte-fa'
import { faExclamationTriangle, faCheckCircle } from '@fortawesome/free-solid-svg-icons';
import vp8_logo from '@/assets/vp8.png'
import vp9_logo from '@/assets/vp9.png'
import h264_logo from '@/assets/h264.png'
export let consumer: ConsumerType
$: video_codec = consumer.video_codec
$: mitigation_mode = consumer.mitigation_mode
<div class="encoder-props">
<div class="codec">
{#if video_codec == "video/x-vp8"}
<img src={vp8_logo} alt="VP8">
{:else if video_codec == "video/x-vp9"}
<img src={vp9_logo} alt="VP8">
{:else if video_codec == "video/x-h264"}
<img src={h264_logo} alt="VP8">
{#if mitigation_mode & MitigationMode.Downsampled && mitigation_mode & MitigationMode.Downscaled}
<abbr title="Very congested link, video is downscaled and downsampled">
<Fa icon={faExclamationTriangle} color="tomato" />
{:else if mitigation_mode & MitigationMode.Downscaled}
<abbr title="Congested link, video is downscaled">
<Fa icon={faExclamationTriangle} color="orange" />
<abbr title="Link with minimal to no congestion">
<Fa icon={faCheckCircle} color="lightseagreen" />
<style lang="scss">
.encoder-props {
display: flex;
justify-content: space-evenly;
.codec {
img {
width: 25px;

View file

@ -0,0 +1,48 @@
<script lang="ts">
import { WebSocketStatus } from '@/types/app'
import logo from '@/assets/svelte.png'
import Fa from 'svelte-fa'
import { faSpinner, faExclamationTriangle, faCheckCircle } from '@fortawesome/free-solid-svg-icons';
export let websocketStatus: WebSocketStatus
<header class="global-header">
<div class="app-name">
<img src={logo} alt="Svelte Logo" class="logo"/>
<div class="title">WebRTC Stats App</div>
{#if websocketStatus == WebSocketStatus.Connected}
<Fa icon={faCheckCircle} color="lightseagreen" size="1x" />
{:else if websocketStatus == WebSocketStatus.Connecting}
<Fa icon={faSpinner} color="#afaeae" size="1x" spin />
{:else if websocketStatus == WebSocketStatus.Error}
<Fa icon={faExclamationTriangle} color="tomato" size="1x" />
<style lang="scss">
.global-header {
background-color: #313131;
height: 56px;
color: white;
padding: 15px;
box-sizing: border-box;
.app-name {
display: flex;
align-items: center;
height: 100%;
.logo {
height: 100%;
margin-right: 5px;
.title {
font-weight: bold;
margin-right: 5px;

View file

@ -0,0 +1,50 @@
<script lang="ts">
import Fa from 'svelte-fa'
import { faTimes } from '@fortawesome/free-solid-svg-icons';
import { createEventDispatcher } from 'svelte';
const dispatch = createEventDispatcher();
<div class="modal-overlay">
<div class="modal">
<div class="modal-header">
<slot name="title"></slot>
<div class="close-icon" on:click="{() => dispatch('closeModal')}">
<Fa icon={faTimes} color="#afaeae" size="1x" />
<slot name="body"></slot>
<slot name="footer"></slot>
<style lang="scss">
.modal {
background-color: white;
padding: 15px 0;
border-radius: 10px;
box-shadow: rgb(0 0 0 / 24%) 0px 3px 8px;
&-overlay {
position: absolute;
top: 0;
height: 100vh;
width: 100vw;
display: flex;
align-items: center;
justify-content: center;
background-color: rgb(0, 0, 0, 0.4);
&-header {
display: flex;
justify-content: space-between;
padding: 0 15px 10px;
border-bottom: 3px solid #e2e2e2;
.close-icon {
cursor: pointer;

View file

@ -0,0 +1,142 @@
<script src="" type="text/javascript"></script>
<script lang="ts">
import { createEventDispatcher, onMount, onDestroy } from 'svelte';
import Modal from '@/components/Modal.svelte'
import type { ConsumerType } from '@/types/app'
import EncoderProps from '@/components/EncoderProps.svelte'
export let consumer: ConsumerType
$: if (consumer === undefined) {
$: id = consumer !== undefined ? : undefined
const dispatch = createEventDispatcher();
let interval: ReturnType<typeof setInterval> | undefined = undefined
onMount(() => {
let plotDiv = document.getElementById('plotDiv');
let traces = []
let layout = {
legend: {traceorder: 'reversed'},
height: 800,
let ctr = 1;
let domain_step = 1.0 / consumer.stats.size
let domain_margin = 0.05
for (let key of consumer.stats.keys()) {
let trace = {
x: [],
y: [],
xaxis: 'x' + ctr,
yaxis: 'y' + ctr,
mode: 'lines',
line: {shape: 'spline'},
name: key
layout['xaxis' + ctr] = {
type: 'date',
layout['yaxis' + ctr] = {
domain: [(ctr - 1) * domain_step, (ctr * domain_step) - domain_margin],
rangemode: "tozero",
ctr += 1
Plotly.newPlot(plotDiv, traces, layout);
interval = setInterval(function() {
let time = new Date()
let ctr = 0
let traces = []
let data_update = {
x: [],
y: [],
for (let value of Object.values(consumer.stats)) {
ctr += 1
Plotly.extendTraces(plotDiv, data_update, traces, 600)
}, 50);
onDestroy(() => {
console.log ("destroyed")
if (interval !== undefined ) {
clearInterval (interval)
interval = undefined
<Modal on:closeModal="{() => dispatch('close')}">
<div slot="body" class="modal-body">
<div class="id">{id}</div>
<div id="plotDiv"></div>
<div slot="footer" class="modal-footer">
<div class="buttons-wrapper">
on:click|stopPropagation="{() => dispatch('close')}"
<style lang="scss">
.modal {
&-body {
width: 1000px;
padding: 20px 15px 10px;
gap: 15px 0;
.id {
font-weight: bold;
margin-bottom: 10px;
&-footer {
padding: 0 15px;
.buttons-wrapper {
text-align: right;
.button {
height: 30px;
padding: 0 10px;
text-align: center;
box-sizing: content-box;
border-radius: 3px;
border: 1px solid #000;
&:active {
background-color: #b9b7b7;

@ -0,0 +1,7 @@
import App from './App.svelte'
const app = new App({
target: document.getElementById('app')
export default app

@ -0,0 +1,64 @@
<script lang="ts">
import type { ConsumerType } from '@/types/app'
import Consumer from '@/components/Consumer.svelte'
import PlotConsumerModal from '@/components/PlotConsumerModal.svelte'
export let consumers: Array<ConsumerType>
let consumerToPlot: ConsumerType | undefined
let showPlotModal = false
* Display the Plot modal
* @param {ConsumerType} consumer
const openPlotConsumer = (consumer: ConsumerType) => {
consumerToPlot = consumer
showPlotModal = true
* Close the Plot modal
const closePlotConsumer = () => {
consumerToPlot = undefined
showPlotModal = false
<div class="consumer-card-container">
{#each consumers as consumer}
consumer = {consumer}
on:click="{() => { openPlotConsumer(consumer) }}"
{#if showPlotModal}
consumer={consumers.find(consumer => consumer == consumerToPlot)}
<style lang="scss">
main {
padding: 2em;
margin: 0 auto;
width: 100vw;
box-sizing: border-box;
.consumer-card {
&-container {
display: flex;
flex-wrap: wrap;
row-gap: 20px;
justify-content: space-evenly;

@ -0,0 +1,18 @@
export enum MitigationMode {
None = 0,
Downscaled = 1,
Downsampled = 2,
export interface ConsumerType {
id: string,
video_codec: string | undefined,
mitigation_mode: MitigationMode,
stats: Map<string, number>,
export enum WebSocketStatus {
Connecting = 0,
Connected = 1,
Error = 2,

@ -0,0 +1,2 @@
/// <reference types="svelte" />
/// <reference types="vite/client" />

@ -0,0 +1,13 @@
import sveltePreprocess from 'svelte-preprocess'
import * as sass from 'sass'
export default {
// Consult
// for more information about preprocessors
preprocess: sveltePreprocess({
sass: {
sync: true,
implementation: sass,

@ -0,0 +1,24 @@
"extends": "@tsconfig/svelte/tsconfig.json",
"compilerOptions": {
"target": "esnext",
"module": "esnext",
"moduleResolution": "node",
"resolveJsonModule": true,
"baseUrl": ".",
* Typechecking JS in `.svelte` and `.js` files by default.
* Disable checkJs if you'd like to use dynamic types in JS.
* Note that setting allowJs false does not prevent the use
* of JS in `.svelte` files.
"allowJs": true,
"checkJs": true,
"paths": {
"@/*": [
"include": ["src/**/*.d.ts", "src/**/*.ts", "src/**/*.js", "src/**/*.svelte"]

@ -0,0 +1,13 @@
import { defineConfig } from 'vite'
import { svelte } from '@sveltejs/vite-plugin-svelte'
import path from 'path';
export default defineConfig({
plugins: [svelte()],
resolve: {
alias: {
'@': path.resolve('/src'),

View file

@ -0,0 +1,16 @@
use gst::glib;
use gst::prelude::*;
mod imp;
glib::wrapper! {
pub struct BandwidthEstimator(ObjectSubclass<imp::BandwidthEstimator>) @extends gst::Element, gst::Object;
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {

View file

@ -0,0 +1,24 @@
use gst::glib;
pub mod gcc;
mod signaller;
pub mod webrtcsink;
fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
concat!(env!("CARGO_PKG_VERSION"), "-", env!("COMMIT_ID")),

@ -0,0 +1,478 @@
use crate::webrtcsink::WebRTCSink;
use anyhow::{anyhow, Error};
use async_std::task;
use async_tungstenite::tungstenite::Message as WsMessage;
use futures::channel::mpsc;
use futures::prelude::*;
use gst::glib::prelude::*;
use gst::glib::{self, Type};
use gst::prelude::*;
use gst::subclass::prelude::*;
use once_cell::sync::Lazy;
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Mutex;
use webrtcsink_protocol as p;
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
Some("WebRTC sink signaller"),
struct State {
/// Sender for the websocket messages
websocket_sender: Option<mpsc::Sender<p::IncomingMessage>>,
send_task_handle: Option<task::JoinHandle<Result<(), Error>>>,
receive_task_handle: Option<task::JoinHandle<()>>,
struct Settings {
address: Option<String>,
cafile: Option<PathBuf>,
impl Default for Settings {
fn default() -> Self {
Self {
address: Some("ws://".to_string()),
cafile: None,
pub struct Signaller {
state: Mutex<State>,
settings: Mutex<Settings>,
impl Signaller {
async fn connect(&self, element: &WebRTCSink) -> Result<(), Error> {
let settings = self.settings.lock().unwrap().clone();
let connector = if let Some(path) = settings.cafile {
let cert = async_std::fs::read_to_string(&path).await?;
let cert = async_native_tls::Certificate::from_pem(cert.as_bytes())?;
let connector = async_native_tls::TlsConnector::new();
} else {
let (ws, _) = async_tungstenite::async_std::connect_async_with_tls_connector(
gst::info!(CAT, obj: element, "connected");
// Channel for asynchronously sending out websocket message
let (mut ws_sink, mut ws_stream) = ws.split();
// 1000 is completely arbitrary, we simply don't want infinite piling
// up of messages as with unbounded
let (mut websocket_sender, mut websocket_receiver) =
let element_clone = element.downgrade();
let send_task_handle = task::spawn(async move {
while let Some(msg) = {
if let Some(element) = element_clone.upgrade() {
gst::trace!(CAT, obj: &element, "Sending websocket message {:?}", msg);
if let Some(element) = element_clone.upgrade() {
gst::info!(CAT, obj: &element, "Done sending");
Ok::<(), Error>(())
let meta = if let Some(meta) =<Option<gst::Structure>>("meta") {
} else {
.send(p::IncomingMessage::SetPeerStatus(p::PeerStatus {
roles: vec![p::PeerRole::Producer],
peer_id: None,
let element_clone = element.downgrade();
let receive_task_handle = task::spawn(async move {
while let Some(msg) = async_std::stream::StreamExt::next(&mut ws_stream).await {
if let Some(element) = element_clone.upgrade() {
match msg {
Ok(WsMessage::Text(msg)) => {
gst::trace!(CAT, obj: &element, "Received message {}", msg);
if let Ok(msg) = serde_json::from_str::<p::OutgoingMessage>(&msg) {
match msg {
p::OutgoingMessage::Welcome { peer_id } => {
obj: &element,
"We are registered with the server, our peer id is {}",
p::OutgoingMessage::StartSession {
} => {
if let Err(err) =
element.start_session(&session_id, &peer_id)
gst::warning!(CAT, obj: &element, "{}", err);
p::OutgoingMessage::EndSession(session_info) => {
if let Err(err) =
gst::warning!(CAT, obj: &element, "{}", err);
p::OutgoingMessage::Peer(p::PeerMessage {
}) => match peer_message {
p::PeerMessageInner::Sdp(p::SdpMessage::Answer { sdp }) => {
if let Err(err) = element.handle_sdp(
) {
gst::warning!(CAT, obj: &element, "{}", err);
p::PeerMessageInner::Sdp(p::SdpMessage::Offer {
}) => {
obj: &element,
"Ignoring offer from peer"
p::PeerMessageInner::Ice {
} => {
if let Err(err) = element.handle_ice(
) {
gst::warning!(CAT, obj: &element, "{}", err);
_ => {
obj: &element,
"Ignoring unsupported message {:?}",
} else {
obj: &element,
"Unknown message from server: {}",
anyhow!("Unknown message from server: {}", msg).into(),
Ok(WsMessage::Close(reason)) => {
obj: &element,
"websocket connection closed: {:?}",
Ok(_) => (),
Err(err) => {
anyhow!("Error receiving: {}", err).into(),
} else {
if let Some(element) = element_clone.upgrade() {
gst::info!(CAT, obj: &element, "Stopped websocket receiving");
let mut state = self.state.lock().unwrap();
state.websocket_sender = Some(websocket_sender);
state.send_task_handle = Some(send_task_handle);
state.receive_task_handle = Some(receive_task_handle);
pub fn start(&self, element: &WebRTCSink) {
let this = self.instance().clone();
let element_clone = element.clone();
task::spawn(async move {
let this = Self::from_instance(&this);
if let Err(err) = this.connect(&element_clone).await {
pub fn handle_sdp(
element: &WebRTCSink,
session_id: &str,
sdp: &gst_webrtc::WebRTCSessionDescription,
) {
let state = self.state.lock().unwrap();
let msg = p::IncomingMessage::Peer(p::PeerMessage {
session_id: session_id.to_string(),
peer_message: p::PeerMessageInner::Sdp(p::SdpMessage::Offer {
sdp: sdp.sdp().as_text().unwrap(),
if let Some(mut sender) = state.websocket_sender.clone() {
let element = element.downgrade();
task::spawn(async move {
if let Err(err) = sender.send(msg).await {
if let Some(element) = element.upgrade() {
element.handle_signalling_error(anyhow!("Error: {}", err).into());
pub fn handle_ice(
element: &WebRTCSink,
session_id: &str,
candidate: &str,
sdp_m_line_index: Option<u32>,
_sdp_mid: Option<String>,
) {
let state = self.state.lock().unwrap();
let msg = p::IncomingMessage::Peer(p::PeerMessage {
session_id: session_id.to_string(),
peer_message: p::PeerMessageInner::Ice {
candidate: candidate.to_string(),
sdp_m_line_index: sdp_m_line_index.unwrap(),
if let Some(mut sender) = state.websocket_sender.clone() {
let element = element.downgrade();
task::spawn(async move {
if let Err(err) = sender.send(msg).await {
if let Some(element) = element.upgrade() {
element.handle_signalling_error(anyhow!("Error: {}", err).into());
pub fn stop(&self, element: &WebRTCSink) {
gst::info!(CAT, obj: element, "Stopping now");
let mut state = self.state.lock().unwrap();
let send_task_handle = state.send_task_handle.take();
let receive_task_handle = state.receive_task_handle.take();
if let Some(mut sender) = state.websocket_sender.take() {
task::block_on(async move {
if let Some(handle) = send_task_handle {
if let Err(err) = handle.await {
gst::warning!(CAT, obj: element, "Error while joining send task: {}", err);
if let Some(handle) = receive_task_handle {
pub fn end_session(&self, element: &WebRTCSink, session_id: &str) {
gst::debug!(CAT, obj: element, "Signalling session {} ended", session_id);
let state = self.state.lock().unwrap();
let session_id = session_id.to_string();
let element = element.downgrade();
if let Some(mut sender) = state.websocket_sender.clone() {
task::spawn(async move {
if let Err(err) = sender
.send(p::IncomingMessage::EndSession(p::EndSessionMessage {
session_id: session_id.to_string(),
if let Some(element) = element.upgrade() {
element.handle_signalling_error(anyhow!("Error: {}", err).into());
impl ObjectSubclass for Signaller {
const NAME: &'static str = "RsWebRTCSinkSignaller";
type Type = super::Signaller;
type ParentType = glib::Object;
impl ObjectImpl for Signaller {
fn properties() -> &'static [glib::ParamSpec] {
static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| {
"Address of the signalling server",
"CA file",
"Path to a Certificate file to add to the set of roots the TLS connector will trust",
fn set_property(
_id: usize,
value: &glib::Value,
pspec: &glib::ParamSpec,
) {
match {
"address" => {
let address: Option<_> = value.get().expect("type checked upstream");
if let Some(address) = address {
gst::info!(CAT, "Signaller address set to {}", address);
let mut settings = self.settings.lock().unwrap();
settings.address = Some(address);
} else {
gst::error!(CAT, "address can't be None");
"cafile" => {
let value: String = value.get().unwrap();
let mut settings = self.settings.lock().unwrap();
settings.cafile = Some(value.into());
_ => unimplemented!(),
fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
match {
"address" => self.settings.lock().unwrap().address.to_value(),
"cafile" => {
let settings = self.settings.lock().unwrap();
let cafile = settings.cafile.as_ref();
cafile.and_then(|file| file.to_str()).to_value()
_ => unimplemented!(),
fn serialize_value(val: &gst::glib::Value) -> Option<serde_json::Value> {
match val.type_() {
Type::STRING => Some(val.get::<String>().unwrap().into()),
Type::BOOL => Some(val.get::<bool>().unwrap().into()),
Type::I32 => Some(val.get::<i32>().unwrap().into()),
Type::U32 => Some(val.get::<u32>().unwrap().into()),
Type::I_LONG | Type::I64 => Some(val.get::<i64>().unwrap().into()),
Type::U_LONG | Type::U64 => Some(val.get::<u64>().unwrap().into()),
Type::F32 => Some(val.get::<f32>().unwrap().into()),
Type::F64 => Some(val.get::<f64>().unwrap().into()),
_ => {
if let Ok(s) = val.get::<gst::Structure>() {
.filter_map(|(name, value)| {
serialize_value(value).map(|value| (name.to_string(), value))
.collect::<HashMap<String, serde_json::Value>>(),
} else if let Ok(a) = val.get::<gst::Array>() {
.filter_map(|value| serialize_value(value))
} else if let Some((_klass, values)) = gst::glib::FlagsValue::from_value(val) {
.map(|value| value.nick())
} else if let Ok(value) = val.serialize() {
} else {
gst::warning!(CAT, "Can't convert {} to json", val.type_().name());

@ -0,0 +1,62 @@
use crate::webrtcsink::{Signallable, WebRTCSink};
use gst::glib;
use gst::subclass::prelude::ObjectSubclassExt;
use std::error::Error;
mod imp;
glib::wrapper! {
pub struct Signaller(ObjectSubclass<imp::Signaller>);
unsafe impl Send for Signaller {}
unsafe impl Sync for Signaller {}
impl Signallable for Signaller {
fn start(&mut self, element: &WebRTCSink) -> Result<(), Box<dyn Error>> {
let signaller = imp::Signaller::from_instance(self);
fn handle_sdp(
&mut self,
element: &WebRTCSink,
peer_id: &str,
sdp: &gst_webrtc::WebRTCSessionDescription,
) -> Result<(), Box<dyn Error>> {
let signaller = imp::Signaller::from_instance(self);
signaller.handle_sdp(element, peer_id, sdp);
fn handle_ice(
&mut self,
element: &WebRTCSink,
session_id: &str,
candidate: &str,
sdp_mline_index: Option<u32>,
sdp_mid: Option<String>,
) -> Result<(), Box<dyn Error>> {
let signaller = imp::Signaller::from_instance(self);
signaller.handle_ice(element, session_id, candidate, sdp_mline_index, sdp_mid);
fn stop(&mut self, element: &WebRTCSink) {
let signaller = imp::Signaller::from_instance(self);
fn session_ended(&mut self, element: &WebRTCSink, session_id: &str) {
let signaller = imp::Signaller::from_instance(self);
signaller.end_session(element, session_id);
impl Default for Signaller {
fn default() -> Self {

@ -0,0 +1,420 @@
use gst::{
glib::{self, value::FromValue},
use once_cell::sync::Lazy;
use super::imp::VideoEncoder;
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
Some("WebRTC sink"),
enum IncreaseType {
/// Increase bitrate by value
/// Increase bitrate by factor
#[derive(Debug, Clone, Copy)]
enum ControllerType {
// Running the "delay-based controller"
// Running the "loss based controller"
enum CongestionControlOp {
/// Don't update target bitrate
/// Decrease target bitrate
Decrease {
factor: f64,
reason: String, // for Debug
/// Increase target bitrate, either additively or multiplicatively
fn lookup_twcc_stats(stats: &gst::StructureRef) -> Option<gst::Structure> {
for (_, field_value) in stats {
if let Ok(s) = field_value.get::<gst::Structure>() {
if let Ok(type_) = s.get::<gst_webrtc::WebRTCStatsType>("type") {
if (type_ == gst_webrtc::WebRTCStatsType::Transport
|| type_ == gst_webrtc::WebRTCStatsType::CandidatePair)
&& s.has_field("gst-twcc-stats")
return Some(s.get::<gst::Structure>("gst-twcc-stats").unwrap());
pub struct CongestionController {
/// Note: The target bitrate applied is the min of
/// target_bitrate_on_delay and target_bitrate_on_loss
/// Bitrate target based on delay factor for all video streams.
/// Hasn't been tested with multiple video streams, but
/// current design is simply to divide bitrate equally.
pub target_bitrate_on_delay: i32,
/// Bitrate target based on loss for all video streams.
pub target_bitrate_on_loss: i32,
/// Exponential moving average, updated when bitrate is
/// decreased, discarded when increased again past last
/// congestion window. Smoothing factor hardcoded.
bitrate_ema: Option<f64>,
/// Exponentially weighted moving variance, recursively
/// updated along with bitrate_ema. sqrt'd to obtain standard
/// deviation, used to determine whether to increase bitrate
/// additively or multiplicatively
bitrate_emvar: f64,
/// Used in additive mode to track last control time, influences
/// calculation of added value according to gcc section 5.5
last_update_time: Option<std::time::Instant>,
/// For logging purposes
peer_id: String,
min_bitrate: u32,
max_bitrate: u32,
impl CongestionController {
pub fn new(peer_id: &str, min_bitrate: u32, max_bitrate: u32) -> Self {
Self {
target_bitrate_on_delay: 0,
target_bitrate_on_loss: 0,
bitrate_ema: None,
bitrate_emvar: 0.,
last_update_time: None,
peer_id: peer_id.to_string(),
fn update_delay(
&mut self,
element: &super::WebRTCSink,
twcc_stats: &gst::StructureRef,
rtt: f64,
) -> CongestionControlOp {
let target_bitrate = f64::min(
self.target_bitrate_on_delay as f64,
self.target_bitrate_on_loss as f64,
// Unwrap, all those fields must be there or there's been an API
// break, which qualifies as programming error
let bitrate_sent = twcc_stats.get::<u32>("bitrate-sent").unwrap();
let bitrate_recv = twcc_stats.get::<u32>("bitrate-recv").unwrap();
let delta_of_delta = twcc_stats.get::<i64>("avg-delta-of-delta").unwrap();
let sent_minus_received = bitrate_sent.saturating_sub(bitrate_recv);
let delay_factor = sent_minus_received as f64 / target_bitrate;
let last_update_time = self.last_update_time.replace(std::time::Instant::now());
obj: element,
"consumer {}: considering stats {}",
if delay_factor > 0.1 {
let (factor, reason) = if delay_factor < 0.64 {
(0.96, format!("low delay factor {}", delay_factor))
} else {
delay_factor.sqrt().sqrt().clamp(0.8, 0.96),
format!("High delay factor {}", delay_factor),
CongestionControlOp::Decrease { factor, reason }
} else if delta_of_delta > 1_000_000 {
CongestionControlOp::Decrease {
factor: 0.97,
reason: format!("High delta: {}", delta_of_delta),
} else {
CongestionControlOp::Increase(if let Some(ema) = self.bitrate_ema {
let bitrate_stdev = self.bitrate_emvar.sqrt();
obj: element,
"consumer {}: Old bitrate: {}, ema: {}, stddev: {}",
// gcc section 5.5 advises 3 standard deviations, but experiments
// have shown this to be too low, probably related to the rest of
// homegrown algorithm not implementing gcc, revisit when implementing
// the rest of the RFC
if target_bitrate < ema - 7. * bitrate_stdev {
obj: element,
"consumer {}: below last congestion window",
/* Multiplicative increase */
} else if target_bitrate > ema + 7. * bitrate_stdev {
obj: element,
"consumer {}: above last congestion window",
/* We have gone past our last estimated max bandwidth
* network situation may have changed, go back to
* multiplicative increase
} else {
let rtt_ms = rtt * 1000.;
let response_time_ms = 100. + rtt_ms;
let time_since_last_update_ms = match last_update_time {
None => 0.,
Some(instant) => {
(self.last_update_time.unwrap() - instant).as_millis() as f64
// gcc section 5.5 advises 0.95 as the smoothing factor, but that
// seems intuitively much too low, granting disproportionate importance
// to the last measurement. 0.5 seems plenty enough, I don't have maths
// to back that up though :)
let alpha = 0.5 * f64::min(time_since_last_update_ms / response_time_ms, 1.0);
let bits_per_frame = target_bitrate / 30.;
let packets_per_frame = f64::ceil(bits_per_frame / (1200. * 8.));
let avg_packet_size_bits = bits_per_frame / packets_per_frame;
obj: element,
"consumer {}: still in last congestion window",
/* Additive increase */
IncreaseType::Additive(f64::max(1000., alpha * avg_packet_size_bits))
} else {
/* Multiplicative increase */
obj: element,
"consumer {}: outside congestion window",
fn clamp_bitrate(&mut self, bitrate: i32, n_encoders: i32, controller_type: ControllerType) {
match controller_type {
ControllerType::Loss => {
self.target_bitrate_on_loss = bitrate.clamp(
self.min_bitrate as i32 * n_encoders,
self.max_bitrate as i32 * n_encoders,
ControllerType::Delay => {
self.target_bitrate_on_delay = bitrate.clamp(
self.min_bitrate as i32 * n_encoders,
self.max_bitrate as i32 * n_encoders,
fn get_remote_inbound_stats(&self, stats: &gst::StructureRef) -> Vec<gst::Structure> {
let mut inbound_rtp_stats: Vec<gst::Structure> = Default::default();
for (_, field_value) in stats {
if let Ok(s) = field_value.get::<gst::Structure>() {
if let Ok(type_) = s.get::<gst_webrtc::WebRTCStatsType>("type") {
if type_ == gst_webrtc::WebRTCStatsType::RemoteInboundRtp {
fn lookup_rtt(&self, stats: &gst::StructureRef) -> f64 {
let inbound_rtp_stats = self.get_remote_inbound_stats(stats);
let mut rtt = 0.;
let mut n_rtts = 0u64;
for inbound_stat in &inbound_rtp_stats {
if let Err(err) = (|| -> Result<(), gst::structure::GetError<<<f64 as FromValue>::Checker as glib::value::ValueTypeChecker>::Error>> {
rtt += inbound_stat.get::<f64>("round-trip-time")?;
n_rtts += 1;
})() {
gst::debug!(CAT, "{:?}", err);
rtt /= f64::max(1., n_rtts as f64);
gst::log!(CAT, "Round trip time: {}", rtt);
pub fn loss_control(
&mut self,
element: &super::WebRTCSink,
stats: &gst::StructureRef,
encoders: &mut Vec<VideoEncoder>,
) {
let loss_percentage = stats.get::<f64>("packet-loss-pct").unwrap();
if loss_percentage > 10. {
CongestionControlOp::Decrease {
factor: ((100. - (0.5 * loss_percentage)) / 100.).clamp(0.7, 0.98),
reason: format!("High loss: {}", loss_percentage),
} else if loss_percentage > 2. {
} else {
pub fn delay_control(
&mut self,
element: &super::WebRTCSink,
stats: &gst::StructureRef,
encoders: &mut Vec<VideoEncoder>,
) {
if let Some(twcc_stats) = lookup_twcc_stats(stats) {
let op = self.update_delay(element, &twcc_stats, self.lookup_rtt(stats));
self.apply_control_op(element, encoders, op, ControllerType::Delay);
fn apply_control_op(
&mut self,
element: &super::WebRTCSink,
encoders: &mut Vec<VideoEncoder>,
control_op: CongestionControlOp,
controller_type: ControllerType,
) {
obj: element,
"consumer {}: applying congestion control operation {:?}",
let n_encoders = encoders.len() as i32;
let prev_bitrate = i32::min(self.target_bitrate_on_delay, self.target_bitrate_on_loss);
match &control_op {
CongestionControlOp::Hold => {}
CongestionControlOp::Increase(IncreaseType::Additive(value)) => {
self.target_bitrate_on_delay + *value as i32,
CongestionControlOp::Increase(IncreaseType::Multiplicative(factor)) => {
(self.target_bitrate_on_delay as f64 * factor) as i32,
CongestionControlOp::Decrease { factor, .. } => {
(self.target_bitrate_on_delay as f64 * factor) as i32,
if let ControllerType::Delay = controller_type {
// Smoothing factor
let alpha = 0.75;
if let Some(ema) = self.bitrate_ema {
let sigma: f64 = (self.target_bitrate_on_delay as f64) - ema;
self.bitrate_ema = Some(ema + (alpha * sigma));
self.bitrate_emvar =
(1. - alpha) * (self.bitrate_emvar + alpha * sigma.powi(2));
} else {
self.bitrate_ema = Some(self.target_bitrate_on_delay as f64);
self.bitrate_emvar = 0.;
let target_bitrate =
i32::min(self.target_bitrate_on_delay, self.target_bitrate_on_loss).clamp(
self.min_bitrate as i32 * n_encoders,
self.max_bitrate as i32 * n_encoders,
) / n_encoders;
if target_bitrate != prev_bitrate {
"{:?} {} => {} | on delay {} - on loss {} | min {} - max {}",
let fec_ratio = {
if target_bitrate <= 2000000 || self.max_bitrate <= 2000000 {
} else {
(target_bitrate as f64 - 2000000f64) / (self.max_bitrate as f64 - 2000000f64)
let fec_percentage = (fec_ratio * 50f64) as u32;
for encoder in encoders.iter_mut() {
encoder.set_bitrate(element, target_bitrate);
.set_property("fec-percentage", fec_percentage);

@ -0,0 +1,164 @@
use gst::glib;
use gst::prelude::*;
use gst::subclass::prelude::*;
use std::error::Error;
mod homegrown_cc;
mod imp;
glib::wrapper! {
pub struct WebRTCSink(ObjectSubclass<imp::WebRTCSink>) @extends gst::Bin, gst::Element, gst::Object, @implements gst::ChildProxy, gst_video::Navigation;
unsafe impl Send for WebRTCSink {}
unsafe impl Sync for WebRTCSink {}
#[derive(thiserror::Error, Debug)]
pub enum WebRTCSinkError {
#[error("no session with id")]
#[error("consumer refused media")]
ConsumerRefusedMedia { session_id: String, media_idx: u32 },
#[error("consumer did not provide valid payload for media")]
ConsumerNoValidPayload { session_id: String, media_idx: u32 },
#[error("SDP mline index is currently mandatory")]
#[error("duplicate session id")]
#[error("error setting up consumer pipeline")]
SessionPipelineError {
session_id: String,
peer_id: String,
details: String,
pub trait Signallable: Sync + Send + 'static {
fn start(&mut self, element: &WebRTCSink) -> Result<(), Box<dyn Error>>;
fn handle_sdp(
&mut self,
element: &WebRTCSink,
session_id: &str,
sdp: &gst_webrtc::WebRTCSessionDescription,
) -> Result<(), Box<dyn Error>>;
/// sdp_mid is exposed for future proofing, see
/// at the moment sdp_m_line_index will always be Some and sdp_mid will always
/// be None
fn handle_ice(
&mut self,
element: &WebRTCSink,
session_id: &str,
candidate: &str,
sdp_m_line_index: Option<u32>,
sdp_mid: Option<String>,
) -> Result<(), Box<dyn Error>>;
fn session_ended(&mut self, element: &WebRTCSink, session_id: &str);
fn stop(&mut self, element: &WebRTCSink);
/// When providing a signaller, we expect it to both be a GObject
/// and be Signallable. This is arguably a bit strange, but exposing
/// a GInterface from rust is at the moment a bit awkward, so I went
/// for a rust interface for now. The reason the signaller needs to be
/// a GObject is to make its properties available through the GstChildProxy
/// interface.
pub trait SignallableObject: AsRef<glib::Object> + Signallable {}
impl<T: AsRef<glib::Object> + Signallable> SignallableObject for T {}
impl Default for WebRTCSink {
fn default() -> Self {
impl WebRTCSink {
pub fn with_signaller(signaller: Box<dyn SignallableObject>) -> Self {
let ret: WebRTCSink = glib::Object::new(&[]);
let ws = imp::WebRTCSink::from_instance(&ret);
pub fn handle_sdp(
session_id: &str,
sdp: &gst_webrtc::WebRTCSessionDescription,
) -> Result<(), WebRTCSinkError> {
let ws = imp::WebRTCSink::from_instance(self);
ws.handle_sdp(self, session_id, sdp)
/// sdp_mid is exposed for future proofing, see
/// at the moment sdp_m_line_index must be Some
pub fn handle_ice(
session_id: &str,
sdp_m_line_index: Option<u32>,
sdp_mid: Option<String>,
candidate: &str,
) -> Result<(), WebRTCSinkError> {
let ws = imp::WebRTCSink::from_instance(self);
ws.handle_ice(self, session_id, sdp_m_line_index, sdp_mid, candidate)
pub fn handle_signalling_error(&self, error: Box<dyn Error + Send + Sync>) {
let ws = imp::WebRTCSink::from_instance(self);
ws.handle_signalling_error(self, anyhow::anyhow!(error));
pub fn start_session(&self, session_id: &str, peer_id: &str) -> Result<(), WebRTCSinkError> {
let ws = imp::WebRTCSink::from_instance(self);
ws.start_session(self, session_id, peer_id)
pub fn end_session(&self, session_id: &str) -> Result<(), WebRTCSinkError> {
let ws = imp::WebRTCSink::from_instance(self);
ws.remove_session(self, session_id, false)
#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Copy, glib::Enum)]
#[enum_type(name = "GstWebRTCSinkCongestionControl")]
pub enum WebRTCSinkCongestionControl {
#[enum_value(name = "Disabled: no congestion control is applied", nick = "disabled")]
#[enum_value(name = "Homegrown: simple sender-side heuristic", nick = "homegrown")]
#[enum_value(name = "Google Congestion Control algorithm", nick = "gcc")]
#[glib::flags(name = "GstWebRTCSinkMitigationMode")]
enum WebRTCSinkMitigationMode {
#[flags_value(name = "No mitigation applied", nick = "none")]
NONE = 0b00000000,
#[flags_value(name = "Lowered resolution", nick = "downscaled")]
DOWNSCALED = 0b00000001,
#[flags_value(name = "Lowered framerate", nick = "downsampled")]
DOWNSAMPLED = 0b00000010,
View file

@ -0,0 +1,12 @@
version = "0.1.0"
edition = "2018"
authors = ["Mathieu Duponchelle <>"]
license = "MPL-2.0"
description = "GStreamer WebRTC sink default protocol"
repository = ""
serde = { version = "1", features = ["derive"] }
serde_json = "1"

@ -0,0 +1,144 @@
/// The default protocol used by the signalling server
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Debug, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct Peer {
pub id: String,
pub meta: Option<serde_json::Value>,
#[derive(Serialize, Deserialize, Debug, PartialEq)]
#[serde(tag = "type")]
#[serde(rename_all = "camelCase")]
/// Messages sent from the server to peers
pub enum OutgoingMessage {
/// Welcoming message, sets the Peer ID linked to a new connection
Welcome { peer_id: String },
/// Notifies listeners that a peer status has changed
/// Instructs a peer to generate an offer and inform about the session ID
#[serde(rename_all = "camelCase")]
StartSession { peer_id: String, session_id: String },
/// Let consumer know that the requested session is starting with the specified identifier
#[serde(rename_all = "camelCase")]
SessionStarted { peer_id: String, session_id: String },
/// Signals that the session the peer was in was ended
#[serde(rename_all = "camelCase")]
/// Messages directly forwarded from one peer to another
/// Provides the current list of consumer peers
List { producers: Vec<Peer> },
/// Notifies that an error occurred with the peer's current session
Error { details: String },
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Clone)]
#[serde(rename_all = "camelCase")]
/// Register with a peer type
pub enum PeerRole {
/// Register as a producer
#[serde(rename_all = "camelCase")]
/// Register as a listener
#[serde(rename_all = "camelCase")]
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Default, Clone)]
#[serde(rename_all = "camelCase")]
pub struct PeerStatus {
pub roles: Vec<PeerRole>,
pub meta: Option<serde_json::Value>,
#[serde(skip_serializing_if = "Option::is_none")]
pub peer_id: Option<String>,
impl PeerStatus {
pub fn producing(&self) -> bool {
self.roles.iter().any(|t| matches!(t, PeerRole::Producer))
pub fn listening(&self) -> bool {
self.roles.iter().any(|t| matches!(t, PeerRole::Listener))
#[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
/// Ask the server to start a session with a producer peer
pub struct StartSessionMessage {
/// Identifies the peer
pub peer_id: String,
#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
#[serde(tag = "type")]
#[serde(rename_all = "camelCase")]
/// Conveys a SDP
pub enum SdpMessage {
/// Conveys an offer
Offer {
/// The SDP
sdp: String,
/// Conveys an answer
Answer {
/// The SDP
sdp: String,
#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
#[serde(rename_all = "camelCase")]
/// Contents of the peer message
pub enum PeerMessageInner {
/// Conveys an ICE candidate
#[serde(rename_all = "camelCase")]
Ice {
/// The candidate string
candidate: String,
/// The mline index the candidate applies to
sdp_m_line_index: u32,
#[derive(Serialize, Deserialize, Debug, PartialEq)]
#[serde(rename_all = "camelCase")]
/// Messages directly forwarded from one peer to another
pub struct PeerMessage {
pub session_id: String,
pub peer_message: PeerMessageInner,
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
/// End a session
pub struct EndSessionMessage {
/// The identifier of the session to end
pub session_id: String,
#[derive(Serialize, Deserialize, Debug)]
#[serde(tag = "type")]
#[serde(rename_all = "camelCase")]
/// Messages received by the server from peers
pub enum IncomingMessage {
/// Internal message to let know about new peers
/// Set current peer status
/// Start a session with a producer peer
/// End an existing session
/// Send a message to a peer the sender is currently in session with
/// Retrieve the current list of producers

View file

@ -0,0 +1,26 @@
version = "0.1.0"
edition = "2018"
authors = ["Mathieu Duponchelle <>"]
license = "MPL-2.0"
description = "GStreamer WebRTC sink signalling server"
repository = ""
anyhow = "1"
async-std = { version = "1", features = ["unstable", "attributes"] }
async-native-tls = "0.4"
async-tungstenite = { version = "0.17", features = ["async-std-runtime", "async-native-tls"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
clap = { version = "4", features = ["derive"] }
tracing = { version = "0.1", features = ["log"] }
tracing-subscriber = { version = "0.3", features = ["registry", "env-filter"] }
tracing-log = "0.1"
futures = "0.3"
uuid = { version = "1", features = ["v4"] }
thiserror = "1"
test-log = { version = "0.2", features = ["trace"], default-features = false }
pin-project-lite = "0.2"
webrtcsink-protocol = { version = "0.1", path="../protocol" }

@ -0,0 +1,101 @@
use async_std::task;
use clap::Parser;
use tracing_subscriber::prelude::*;
use webrtcsink_signalling::handlers::Handler;
use webrtcsink_signalling::server::Server;
use anyhow::Error;
use async_native_tls::TlsAcceptor;
use async_std::fs::File as AsyncFile;
use async_std::net::TcpListener;
use tracing::{info, warn};
#[derive(Parser, Debug)]
#[clap(about, version, author)]
/// Program arguments
struct Args {
/// Address to listen on
#[clap(short, long, default_value = "")]
host: String,
/// Port to listen on
#[clap(short, long, default_value_t = 8443)]
port: u16,
/// TLS certificate to use
#[clap(short, long)]
cert: Option<String>,
/// password to TLS certificate
cert_password: Option<String>,
fn initialize_logging(envvar_name: &str) -> Result<(), Error> {
let env_filter = tracing_subscriber::EnvFilter::try_from_env(envvar_name)
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info"));
let fmt_layer = tracing_subscriber::fmt::layer()
| tracing_subscriber::fmt::format::FmtSpan::CLOSE,
let subscriber = tracing_subscriber::Registry::default()
fn main() -> Result<(), Error> {
let args = Args::parse();
let server = Server::spawn(|stream| Handler::new(stream));
task::block_on(async move {
let addr = format!("{}:{}",, args.port);
// Create the event loop and TCP listener we'll accept connections on.
let listener = TcpListener::bind(&addr).await?;
let acceptor = match args.cert {
Some(cert) => {
let key = AsyncFile::open(cert).await?;
Some(TlsAcceptor::new(key, args.cert_password.as_deref().unwrap_or("")).await?)
None => None,
info!("Listening on: {}", addr);
while let Ok((stream, _)) = listener.accept().await {
let mut server_clone = server.clone();
let address = match stream.peer_addr() {
Ok(address) => address,
Err(err) => {
warn!("Connected peer with no address: {}", err);
info!("Accepting connection from {}", address);
if let Some(ref acceptor) = acceptor {
let stream = match acceptor.accept(stream).await {
Ok(stream) => stream,
Err(err) => {
warn!("Failed to accept TLS connection from {}: {}", address, err);
task::spawn(async move { server_clone.accept_async(stream).await });
} else {
task::spawn(async move { server_clone.accept_async(stream).await });

@ -0,0 +1,2 @@
pub mod handlers;
pub mod server;

@ -0,0 +1,218 @@
use anyhow::Error;
use async_std::task;
use async_tungstenite::tungstenite::Message as WsMessage;
use futures::channel::mpsc;
use futures::prelude::*;
use futures::{AsyncRead, AsyncWrite};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use tracing::{info, instrument, trace, warn};
struct Peer {
receive_task_handle: task::JoinHandle<()>,
send_task_handle: task::JoinHandle<Result<(), Error>>,
sender: mpsc::Sender<String>,
struct State {
tx: Option<mpsc::Sender<(String, Option<String>)>>,
peers: HashMap<String, Peer>,
pub struct Server {
state: Arc<Mutex<State>>,
#[derive(thiserror::Error, Debug)]
pub enum ServerError {
#[error("error during handshake {0}")]
Handshake(#[from] async_tungstenite::tungstenite::Error),
impl Server {
#[instrument(level = "debug", skip(factory))]
pub fn spawn<
I: for<'a> Deserialize<'a>,
O: Serialize + std::fmt::Debug,
Factory: FnOnce(Pin<Box<dyn Stream<Item = (String, Option<I>)> + Send>>) -> St,
St: Stream<Item = (String, O)>,
factory: Factory,
) -> Self
O: Serialize + std::fmt::Debug,
St: Send + Unpin + 'static,
let (tx, rx) = mpsc::channel::<(String, Option<String>)>(1000);
let mut handler = factory(Box::pin(rx.filter_map(|(peer_id, msg)| async move {
if let Some(msg) = msg {
match serde_json::from_str::<I>(&msg) {
Ok(msg) => Some((peer_id, Some(msg))),
Err(err) => {
warn!("Failed to parse incoming message: {} ({})", err, msg);
} else {
Some((peer_id, None))
let state = Arc::new(Mutex::new(State {
tx: Some(tx),
peers: HashMap::new(),
let state_clone = state.clone();
let _ = task::spawn(async move {
while let Some((peer_id, msg)) = {
match serde_json::to_string(&msg) {
Ok(msg) => {
if let Some(peer) = state_clone.lock().unwrap().peers.get_mut(&peer_id) {
let mut sender = peer.sender.clone();
task::spawn(async move {
let _ = sender.send(msg).await;
Err(err) => {
warn!("Failed to serialize outgoing message: {}", err);
Self { state }
#[instrument(level = "debug", skip(state))]
fn remove_peer(state: Arc<Mutex<State>>, peer_id: &str) {
if let Some(mut peer) = state.lock().unwrap().peers.remove(peer_id) {
let peer_id = peer_id.to_string();
task::spawn(async move {
if let Err(err) = peer.send_task_handle.await {
trace!(peer_id = %peer_id, "Error while joining send task: {}", err);
#[instrument(level = "debug", skip(self, stream))]
pub async fn accept_async<S: 'static>(&mut self, stream: S) -> Result<String, ServerError>
S: AsyncRead + AsyncWrite + Unpin + Send,
let ws = match async_tungstenite::accept_async(stream).await {
Ok(ws) => ws,
Err(err) => {
warn!("Error during the websocket handshake: {}", err);
return Err(ServerError::Handshake(err));
let this_id = uuid::Uuid::new_v4().to_string();
info!(this_id = %this_id, "New WebSocket connection");
// 1000 is completely arbitrary, we simply don't want infinite piling
// up of messages as with unbounded
let (websocket_sender, mut websocket_receiver) = mpsc::channel::<String>(1000);
let this_id_clone = this_id.clone();
let (mut ws_sink, mut ws_stream) = ws.split();
let send_task_handle = task::spawn(async move {
loop {
match async_std::future::timeout(
Ok(Some(msg)) => {
trace!(this_id = %this_id_clone, "sending {}", msg);
Ok(None) => {
Err(_) => {
trace!(this_id = %this_id_clone, "timeout, sending ping");
Ok::<(), Error>(())
let mut tx = self.state.lock().unwrap().tx.clone();
let this_id_clone = this_id.clone();
let state_clone = self.state.clone();
let receive_task_handle = task::spawn(async move {
if let Some(tx) = tx.as_mut() {
if let Err(err) = tx
"type": "newPeer",
warn!(this = %this_id_clone, "Error handling message: {:?}", err);
while let Some(msg) = {
info!("Received message {msg:?}");
match msg {
Ok(WsMessage::Text(msg)) => {
if let Some(tx) = tx.as_mut() {
if let Err(err) = tx.send((this_id_clone.clone(), Some(msg))).await {
warn!(this = %this_id_clone, "Error handling message: {:?}", err);
Ok(WsMessage::Close(reason)) => {
info!(this_id = %this_id_clone, "connection closed: {:?}", reason);
Ok(WsMessage::Pong(_)) => {
Ok(_) => warn!(this_id = %this_id_clone, "Unsupported message type"),
Err(err) => {
warn!(this_id = %this_id_clone, "recv error: {}", err);
if let Some(tx) = tx.as_mut() {
let _ = tx.send((this_id_clone.clone(), None)).await;
Self::remove_peer(state_clone, &this_id_clone);
Peer {
sender: websocket_sender,

View file

@ -0,0 +1,38 @@
<!DOCTYPE html>
vim: set sts=2 sw=2 et :
Demo Javascript app for negotiating and streaming a sendrecv webrtc stream
with a GStreamer app. Runs only in passive mode, i.e., responds to offers
with answers, exchanges ICE candidates, and streams.
Author: Nirbheek Chauhan <>
.error { color: red; }
<link rel="stylesheet" type="text/css" href="theme.css">
<script src=""></script>
<script src="keyboard.js"></script>
<script src="input.js"></script>
<script src="webrtc.js"></script>
<script>window.onload = setup;</script>
<div class="holygrail-body">
<div class="content">
<div id="sessions">
<div id="image-holder">
<img id="image"></img>
<ul class="nav" id="camera-list">

net/webrtc/www/input.js Normal file
View file

@ -0,0 +1,482 @@
* Copyright 2019 Google LLC
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
/*global GamepadManager*/
/*eslint no-unused-vars: ["error", { "vars": "local" }]*/
class Input {
* Input handling for WebRTC web app
* @constructor
* @param {Element} [element]
* Video element to attach events to
* @param {function} [send]
* Function used to send input events to server.
constructor(element, send) {
* @type {Element}
this.element = element;
* @type {function}
this.send = send;
* @type {boolean}
this.mouseRelative = false;
* @type {Object}
this.m = null;
* @type {Keyboard}
this.keyboard = null;
* @type {GamepadManager}
this.gamepadManager = null;
* @type {Integer}
this.x = 0;
* @type {Integer}
this.y = 0;
* @type {Integer}
this.lastTouch = 0;
* @type {function}
this.ongamepadconnected = null;
* @type {function}
this.ongamepaddisconneceted = null;
* List of attached listeners, record keeping used to detach all.
* @type {Array}
this.listeners = [];
* @type {function}
this.onresizeend = null;
// internal variables used by resize start/end functions.
this._rtime = null;
this._rtimeout = false;
this._rdelta = 200;
* Handles mouse button and motion events and sends them to WebRTC app.
* @param {MouseEvent} event
_mouseButtonMovement(event) {
const down = (event.type === 'mousedown' ? 1 : 0);
var data = {};
if (event.type === 'mousemove' && !this.m) return;
if (!document.pointerLockElement) {
if (this.mouseRelative);
// Hotkey to enable pointer lock, CTRL-SHIFT-LeftButton
if (down && event.button === 0 && event.ctrlKey && event.shiftKey) {;
if (document.pointerLockElement) {
// FIXME - mark as relative!
console.warn("FIXME: Make event relative!")
this.x = event.movementX;
this.y = event.movementY;
} else if (event.type === 'mousemove') {
this.x = this._clientToServerX(event.clientX);
this.y = this._clientToServerY(event.clientY);
data["event"] = "MouseMove"
if (event.type === 'mousedown') {
data["event"] = "MouseButtonPress";
} else if (event.type === 'mouseup') {
data["event"] = "MouseButtonRelease";
if (event.type === 'mousedown' || event.type === 'mouseup') {
data["button"] = event.button + 1;
data["x"] = this.x;
data["y"] = this.y;
data["modifier_state"] = this._modifierState(event);
* Handles touch events and sends them to WebRTC app.
* @param {TouchEvent} event
_touch(event) {
var mod_state = this._modifierState(event);
// Use TouchUp for cancelled touch points
if (event.type === 'touchcancel') {
let data = {};
data["event"] = "TouchUp";
data["identifier"] = event.changedTouches[0].identifier;
data["x"] = this._clientToServerX(event.changedTouches[0].clientX);
data["y"] = this._clientToServerY(event.changedTouches[0].clientY);
data["modifier_state"] = mod_state;
if (event.type === 'touchstart') {
var event_name = "TouchDown";
} else if (event.type === 'touchmove') {
var event_name = "TouchMotion";
} else if (event.type === 'touchend') {
var event_name = "TouchUp";
for (let touch of event.changedTouches) {
let data = {};
data["event"] = event_name;
data["identifier"] = touch.identifier;
data["x"] = this._clientToServerX(touch.clientX);
data["y"] = this._clientToServerY(touch.clientY);
data["modifier_state"] = mod_state;
if (event.type !== 'touchend') {
if ('force' in touch) {
data["pressure"] = touch.force;
} else {
data["pressure"] = NaN;
if (event.timeStamp > this.lastTouch) {
let data = {};
data["event"] = "TouchFrame";
data["modifier_state"] = mod_state;
this.lastTouch = event.timeStamp;
* Handles mouse wheel events and sends them to WebRTC app.
* @param {MouseEvent} event
_wheel(event) {
let data = {
"event": "MouseScroll",
"x": this.x,
"y": this.y,
"delta_x": -event.deltaX,
"delta_y": -event.deltaY,
"modifier_state": this._modifierState(event),
* Captures mouse context menu (right-click) event and prevents event propagation.
* @param {MouseEvent} event
_contextMenu(event) {
* Sends WebRTC app command to hide the remote pointer when exiting pointer lock.
_exitPointerLock() {
* constructs the string representation for the active modifiers on the event
_modifierState(event) {
let masks = []
if (event.altKey) masks.push("alt-mask");
if (event.ctrlKey) masks.push("control-mask");
if (event.metaKey) masks.push("meta-mask");
if (event.shiftKey) masks.push("shift-mask");
return masks.join('+')
* Captures display and video dimensions required for computing mouse pointer position.
* This should be fired whenever the window size changes.
_windowMath() {
const windowW = this.element.offsetWidth;
const windowH = this.element.offsetHeight;
const frameW = this.element.videoWidth;
const frameH = this.element.videoHeight;
const multi = Math.min(windowW / frameW, windowH / frameH);
const vpWidth = frameW * multi;
const vpHeight = (frameH * multi);
var elem = this.element;
var offsetLeft = 0;
var offsetTop = 0;
do {
if (!isNaN(elem.offsetLeft)) {
offsetLeft += elem.offsetLeft;
if (!isNaN(elem.offsetTop)) {
offsetTop += elem.offsetTop;
} while (elem = elem.offsetParent);
this.m = {
mouseMultiX: frameW / vpWidth,
mouseMultiY: frameH / vpHeight,
mouseOffsetX: Math.max((windowW - vpWidth) / 2.0, 0),
mouseOffsetY: Math.max((windowH - vpHeight) / 2.0, 0),
offsetLeft: offsetLeft,
offsetTop: offsetTop,
scrollX: window.scrollX,
scrollY: window.scrollY,
* Translates pointer position X based on current window math.
* @param {Integer} clientX
_clientToServerX(clientX) {
var serverX = Math.round((clientX - this.m.mouseOffsetX - this.m.offsetLeft + this.m.scrollX) * this.m.mouseMultiX);
if (serverX === this.m.frameW - 1) serverX = this.m.frameW;
if (serverX > this.m.frameW) serverX = this.m.frameW;
if (serverX < 0) serverX = 0;
return serverX;
* Translates pointer position Y based on current window math.
* @param {Integer} clientY
_clientToServerY(clientY) {
let serverY = Math.round((clientY - this.m.mouseOffsetY - this.m.offsetTop + this.m.scrollY) * this.m.mouseMultiY);
if (serverY === this.m.frameH - 1) serverY = this.m.frameH;
if (serverY > this.m.frameH) serverY = this.m.frameH;
if (serverY < 0) serverY = 0;
return serverY;
* When fullscreen is entered, request keyboard and pointer lock.
_onFullscreenChange() {
if (document.fullscreenElement !== null) {
// Enter fullscreen
// Reset local keyboard. When holding to exit full-screen the escape key can get stuck.
// Reset stuck keys on server side.
// FIXME: How to implement resetting keyboard with the GstNavigation interface
// this.send("kr");
* Called when window is being resized, used to detect when resize ends so new resolution can be sent.
_resizeStart() {
this._rtime = new Date();
if (this._rtimeout === false) {
this._rtimeout = true;
setTimeout(() => { this._resizeEnd() }, this._rdelta);
* Called in setTimeout loop to detect if window is done being resized.
_resizeEnd() {
if (new Date() - this._rtime < this._rdelta) {
setTimeout(() => { this._resizeEnd() }, this._rdelta);
} else {
this._rtimeout = false;
if (this.onresizeend !== null) {
* Attaches input event handles to docuemnt, window and element.
attach() {
this.listeners.push(addListener(this.element, 'resize', this._windowMath, this));
this.listeners.push(addListener(this.element, 'wheel', this._wheel, this));
this.listeners.push(addListener(this.element, 'contextmenu', this._contextMenu, this));
this.listeners.push(addListener(this.element.parentElement, 'fullscreenchange', this._onFullscreenChange, this));
this.listeners.push(addListener(window, 'resize', this._windowMath, this));
this.listeners.push(addListener(window, 'resize', this._resizeStart, this));
if ('ontouchstart' in window) {
console.warning("FIXME: Enabling mouse pointer display for touch devices.");
} else {
this.listeners.push(addListener(this.element, 'mousemove', this._mouseButtonMovement, this));
this.listeners.push(addListener(this.element, 'mousedown', this._mouseButtonMovement, this));
this.listeners.push(addListener(this.element, 'mouseup', this._mouseButtonMovement, this));
this.listeners.push(addListener(this.element, 'touchstart', this._touch, this));
this.listeners.push(addListener(this.element, 'touchend', this._touch, this));
this.listeners.push(addListener(this.element, 'touchmove', this._touch, this));
this.listeners.push(addListener(this.element, 'touchcancel', this._touch, this));
// Adjust for scroll offset
this.listeners.push(addListener(window, 'scroll', () => {
this.m.scrollX = window.scrollX;
this.m.scrollY = window.scrollY;
}, this));
// Using guacamole keyboard because it has the keysym translations.
this.keyboard = new Keyboard(this.element);
this.keyboard.onkeydown = (keysym, state) => {
this.send({"event": "KeyPress", "key": keysym, "modifier_state": state});
this.keyboard.onkeyup = (keysym, state) => {
this.send({"event": "KeyRelease", "key": keysym, "modifier_state": state});
detach() {
if (this.keyboard) {
this.keyboard.onkeydown = null;
this.keyboard.onkeyup = null;
delete this.keyboard;
// FIXME: How to implement resetting keyboard with the GstNavigation interface
// this.send("kr");
* Request keyboard lock, must be in fullscreen mode to work.
requestKeyboardLock() {
// event codes:
const keys = [
console.log("requesting keyboard lock");
() => {
console.log("keyboard lock success");
(e) => {
console.log("keyboard lock failed: ", e);
getWindowResolution() {
return [
parseInt(this.element.offsetWidth * window.devicePixelRatio),
parseInt(this.element.offsetHeight * window.devicePixelRatio)
* Helper function to keep track of attached event listeners.
* @param {Object} obj
* @param {string} name
* @param {function} func
* @param {Object} ctx
function addListener(obj, name, func, ctx) {
const newFunc = ctx ? func.bind(ctx) : func;
obj.addEventListener(name, newFunc);
return [obj, name, newFunc];
* Helper function to remove all attached event listeners.
* @param {Array} listeners
function removeListeners(listeners) {
for (const listener of listeners)
listener[0].removeEventListener(listener[1], listener[2]);

View file

@ -0,0 +1,141 @@
/* Reset CSS from Eric Meyer */
html, body, div, span, applet, object, iframe,
h1, h2, h3, h4, h5, h6, p, blockquote, pre,
a, abbr, acronym, address, big, cite, code,
del, dfn, em, img, ins, kbd, q, s, samp,
small, strike, strong, sub, sup, tt, var,
b, u, i, center,
dl, dt, dd, ol, ul, li,
fieldset, form, label, legend,
table, caption, tbody, tfoot, thead, tr, th, td,
article, aside, canvas, details, embed,
figure, figcaption, footer, header, hgroup,
menu, nav, output, ruby, section, summary,
time, mark, audio, video {
margin: 0;
padding: 0;
border: 0;
font-size: 100%;
font: inherit;
vertical-align: baseline;
/* HTML5 display-role reset for older browsers */
article, aside, details, figcaption, figure,
footer, header, hgroup, menu, nav, section {
display: block;
body {
line-height: 1;
ol, ul {
list-style: none;
blockquote, q {
quotes: none;
blockquote:before, blockquote:after,
q:before, q:after {
content: '';
content: none;
table {
border-collapse: collapse;
border-spacing: 0;
/* Our style */
display: flex;
flex-direction: column;
min-height: 100vh;
background-color: #222;
color: white;
.holygrail-body {
flex: 1 0 auto;
display: flex;
.holygrail-body .content {
width: 100%;
#sessions {
display: flex;
flex-direction: row;
flex-wrap: wrap;
justify-content: space-around;
.holygrail-body .nav {
width: 220px;
list-style: none;
text-align: left;
order: -1;
background-color: #333;
margin: 0;
@media (max-width: 700px) {
.holygrail-body {
flex-direction: column;
.holygrail-body .nav {
width: 100%;
.session p span {
float: right;
.session p {
padding-top: 5px;
padding-bottom: 5px;
.stream {
background-color: black;
width: 480px;
#camera-list {
text-align: center;
.button {
border: none;
padding: 8px;
text-align: center;
text-decoration: none;
display: inline-block;
font-size: 16px;
-webkit-transition-duration: 0.4s; /* Safari */
transition-duration: 0.4s;
cursor: pointer;
margin: 5px auto;
width: 90%;
.button1 {
background-color: #222;
color: white;
border: 2px solid #4CAF50;
word-wrap: anywhere;
.button1:hover {
background-color: #4CAF50;
color: white;
#image-holder {
display: flex;
flex-direction: row;
flex-wrap: wrap;
justify-content: space-around;

View file

@ -0,0 +1,466 @@
/* vim: set sts=4 sw=4 et :
* Demo Javascript app for negotiating and streaming a sendrecv webrtc stream
* with a GStreamer app. Runs only in passive mode, i.e., responds to offers
* with answers, exchanges ICE candidates, and streams.
* Author: Nirbheek Chauhan <>
// Set this to override the automatic detection in websocketServerConnect()
var ws_server;
var ws_port;
// Override with your own STUN servers if you want
var rtc_configuration = {iceServers: [{urls: ""},
/* TODO: do not keep these static and in clear text in production,
* and instead use one of the mechanisms discussed in
{'urls': '',
'credential': '1qaz2wsx',
'username': 'test'
/* Uncomment the following line to ensure the turn server is used
* while testing. This should be kept commented out in production,
* as non-relay ice candidates should be preferred
// iceTransportPolicy: "relay",
var sessions = {}
/* */
function getOurId() {
return 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, function(c) {
var r = Math.random() * 16 | 0, v = c == 'x' ? r : (r & 0x3 | 0x8);
return v.toString(16);
function Uint8ToString(u8a){
var CHUNK_SZ = 0x8000;
var c = [];
for (var i=0; i < u8a.length; i+=CHUNK_SZ) {
c.push(String.fromCharCode.apply(null, u8a.subarray(i, i+CHUNK_SZ)));
return c.join("");
function Session(our_id, peer_id, closed_callback) { = null;
this.peer_connection = null;
this.ws_conn = null;
this.peer_id = peer_id;
this.our_id = our_id;
this.closed_callback = closed_callback;
this.data_channel = null;
this.input = null;
this.getVideoElement = function() {
return document.getElementById("stream-" + this.our_id);
this.resetState = function() {
if (this.peer_connection) {
this.peer_connection = null;
var videoElement = this.getVideoElement();
if (videoElement) {
videoElement.src = "";
var session_div = document.getElementById("session-" + this.our_id);
if (session_div) {
if (this.ws_conn) {
this.ws_conn = null;
this.input && this.input.detach();
this.data_channel = null;
this.handleIncomingError = function(error) {
this.setStatus = function(text) {
var span = document.getElementById("status-" + this.our_id);
// Don't set the status if it already contains an error
if (!span.classList.contains('error'))
span.textContent = text;
this.setError = function(text) {
var span = document.getElementById("status-" + this.our_id);
span.textContent = text;
// Local description was set, send it to peer
this.onLocalDescription = function(desc) {
console.log("Got local description: " + JSON.stringify(desc), this);
var thiz = this;
this.peer_connection.setLocalDescription(desc).then(() => {
this.setStatus("Sending SDP answer");
var sdp = {
'type': 'peer',
'sdp': this.peer_connection.localDescription.toJSON()
}).catch(function(e) {
this.onRemoteDescriptionSet = function() {
this.setStatus("Remote SDP set");
this.setStatus("Got SDP offer");
// SDP offer received from peer, set remote description and create an answer
this.onIncomingSDP = function(sdp) {
var thiz = this;
.catch(function(e) {
// ICE candidate received from peer, add it to the peer connection
this.onIncomingICE = function(ice) {
var candidate = new RTCIceCandidate(ice);
var thiz = this;
this.peer_connection.addIceCandidate(candidate).catch(function(e) {
this.onServerMessage = function(event) {
console.log("Received " +;
try {
msg = JSON.parse(;
} catch (e) {
if (e instanceof SyntaxError) {
this.handleIncomingError("Error parsing incoming JSON: " +;
} else {
this.handleIncomingError("Unknown error parsing response: " +;
if (msg.type == "registered") {
this.setStatus("Registered with server");
} else if (msg.type == "sessionStarted") {
this.setStatus("Registered with server"); = msg.sessionId;
} else if (msg.type == "error") {
} else if (msg.type == "endSession") {
} else if (msg.type == "peer") {
// Incoming peer message signals the beginning of a call
if (!this.peer_connection)
if (msg.sdp != null) {
} else if ( != null) {
} else {
this.handleIncomingError("Unknown incoming JSON: " + msg);
this.streamIsPlaying = function(e) {
this.onServerClose = function(event) {
this.onServerError = function(event) {
this.handleIncomingError('Server error');
this.websocketServerConnect = function() {
// Clear errors in the status span
var span = document.getElementById("status-" + this.our_id);
span.textContent = '';
console.log("Our ID:", this.our_id);
var ws_port = ws_port || '8443';
if (window.location.protocol.startsWith ("file")) {
var ws_server = ws_server || "";
} else if (window.location.protocol.startsWith ("http")) {
var ws_server = ws_server || window.location.hostname;
} else {
throw new Error ("Don't know how to connect to the signalling server with uri" + window.location);
var ws_url = 'ws://' + ws_server + ':' + ws_port
this.setStatus("Connecting to server " + ws_url);
this.ws_conn = new WebSocket(ws_url);
/* When connected, immediately register with the server */
this.ws_conn.addEventListener('open', (event) => {
this.setStatus("Connecting to the peer");
this.ws_conn.addEventListener('error', this.onServerError.bind(this));
this.ws_conn.addEventListener('message', this.onServerMessage.bind(this));
this.ws_conn.addEventListener('close', this.onServerClose.bind(this));
this.connectPeer = function() {
this.setStatus("Connecting " + this.peer_id);
"type": "startSession",
"peerId": this.peer_id
this.onRemoteStreamAdded = function(event) {
var videoTracks =;
var audioTracks =;
if (videoTracks.length > 0) {
console.log('Incoming stream: ' + videoTracks.length + ' video tracks and ' + audioTracks.length + ' audio tracks');
this.getVideoElement().srcObject =;
} else {
this.handleIncomingError('Stream with unknown tracks added, resetting');
this.createCall = function(msg) {
console.log('Creating RTCPeerConnection');
this.peer_connection = new RTCPeerConnection(rtc_configuration);
this.peer_connection.onaddstream = this.onRemoteStreamAdded.bind(this);
this.peer_connection.ondatachannel = (event) => {
console.log(`Data channel created: ${}`);
this.data_channel =;
video_element = this.getVideoElement();
if (video_element) {
this.input = new Input(video_element, (data) => {
if (this.data_channel) {
console.log(`Navigation data: ${data}`);
this.data_channel.onopen = (event) => {
console.log("Receive channel opened, attaching input");
this.data_channel.onclose = (event) => {"Receive channel closed");
this.input && this.input.detach();
this.data_channel = null;
this.data_channel.onerror = (event) => {
this.input && this.input.detach();
console.warn("Error on receive channel",;
this.data_channel = null;
let buffer = [];
this.data_channel.onmessage = (event) => {
if (typeof === 'string' || instanceof String) {
if ( == 'BEGIN_IMAGE')
buffer = [];
else if ( == 'END_IMAGE') {
var decoder = new TextDecoder("ascii");
var array_buffer = new Uint8Array(buffer);
var str = decoder.decode(array_buffer);
let img = document.getElementById("image");
img.src = 'data:image/png;base64, ' + str;
} else {
var i, len = buffer.length
var view = new DataView(;
for (i = 0; i < view.byteLength; i++) {
buffer[len + i] = view.getUint8(i);
this.peer_connection.onicecandidate = (event) => {
if (event.candidate == null) {
console.log("ICE Candidate was null, done");
"type": "peer",
"ice": event.candidate.toJSON()
this.setStatus("Created peer connection for call, waiting for SDP");
document.getElementById("stream-" + this.our_id).addEventListener("playing", this.streamIsPlaying.bind(this), false);
function startSession() {
var peer_id = document.getElementById("camera-id").value;
if (peer_id === "") {
sessions[peer_id] = new Session(peer_id);
function session_closed(peer_id) {
sessions[peer_id] = null;
function addPeer(peer_id, meta) {
console.log("Meta: ", JSON.stringify(meta));
var nav_ul = document.getElementById("camera-list");
meta = meta ? meta : {"display-name": peer_id};
let display_html = `${meta["display-name"] ? meta["display-name"] : peer_id}<ul>`;
for (const key in meta) {
if (key != "display-name") {
display_html += `<li>- ${key}: ${meta[key]}</li>`;
display_html += "</ul>"
var li_str = '<li id="peer-' + peer_id + '"><button class="button button1">' + display_html + '</button></li>';
nav_ul.insertAdjacentHTML('beforeend', li_str);
var li = document.getElementById("peer-" + peer_id);
li.onclick = function(e) {
var sessions_div = document.getElementById('sessions');
var our_id = getOurId();
var session_div_str = '<div class="session" id="session-' + our_id + '"><video preload="none" class="stream" id="stream-' + our_id + '"></video><p>Status: <span id="status-' + our_id + '">unknown</span></p></div>'
sessions_div.insertAdjacentHTML('beforeend', session_div_str);
sessions[peer_id] = new Session(our_id, peer_id, session_closed);
function clearPeers() {
var nav_ul = document.getElementById("camera-list");
while (nav_ul.firstChild) {
function onServerMessage(event) {
console.log("Received " +;
try {
msg = JSON.parse(;
} catch (e) {
if (e instanceof SyntaxError) {
console.error("Error parsing incoming JSON: " +;
} else {
console.error("Unknown error parsing response: " +;
if (msg.type == "welcome") {`Got welcomed with ID ${msg.peer_id}`);
"type": "list"
} else if (msg.type == "list") {
for (i = 0; i < msg.producers.length; i++) {
addPeer(msg.producers[i].id, msg.producers[i].meta);
} else if (msg.type == "peerStatusChanged") {
var li = document.getElementById("peer-" + msg.peerId);
if (msg.roles.includes("producer")) {
if (li == null) {
console.error('Adding peer');
addPeer(msg.peerId, msg.meta);
} else if (li != null) {
} else {
console.error("Unsupported message: ", msg);
function clearConnection() {
ws_conn.removeEventListener('error', onServerError);
ws_conn.removeEventListener('message', onServerMessage);
ws_conn.removeEventListener('close', onServerClose);
ws_conn = null;
function onServerClose(event) {
window.setTimeout(connect, 1000);
function onServerError(event) {
console.log("Error", event);
window.setTimeout(connect, 1000);
function connect() {
var ws_port = ws_port || '8443';
if (window.location.protocol.startsWith ("file")) {
var ws_server = ws_server || "";
} else if (window.location.protocol.startsWith ("http")) {
var ws_server = ws_server || window.location.hostname;
} else {
throw new Error ("Don't know how to connect to the signalling server with uri" + window.location);
var ws_url = 'ws://' + ws_server + ':' + ws_port
console.log("Connecting listener");
ws_conn = new WebSocket(ws_url);
ws_conn.addEventListener('open', (event) => {
"type": "setPeerStatus",
"roles": ["listener"]
ws_conn.addEventListener('error', onServerError);
ws_conn.addEventListener('message', onServerMessage);
ws_conn.addEventListener('close', onServerClose);
function setup() {