signalling: update protocol, reimplement in rust

This commit is contained in:
Mathieu Duponchelle 2022-03-03 04:30:44 +01:00
parent 7f000ea42b
commit bd88395859
16 changed files with 1933 additions and 572 deletions

379
Cargo.lock generated
View file

@ -13,9 +13,19 @@ dependencies = [
[[package]]
name = "anyhow"
version = "1.0.53"
version = "1.0.56"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94a45b455c14666b85fc40a019e8ab9eb75e3a124e05494f5397122bc9eb06e0"
checksum = "4361135be9122e0870de935d7c439aef945b9f9ddd4199a553b5270b49c82a27"
[[package]]
name = "async-attributes"
version = "1.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a3203e79f4dd9bdda415ed03cf14dae5a2bf775c683a00f94e9cd1faf0f596e5"
dependencies = [
"quote",
"syn",
]
[[package]]
name = "async-channel"
@ -44,9 +54,9 @@ dependencies = [
[[package]]
name = "async-global-executor"
version = "2.0.2"
version = "2.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9586ec52317f36de58453159d48351bc244bc24ced3effc1fce22f3d48664af6"
checksum = "c026b7e44f1316b567ee750fea85103f87fcb80792b860e979f221259796ca0a"
dependencies = [
"async-channel",
"async-executor",
@ -79,9 +89,9 @@ dependencies = [
[[package]]
name = "async-lock"
version = "2.4.0"
version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6a8ea61bf9947a1007c5cada31e647dbc77b103c679858150003ba697ea798b"
checksum = "e97a171d191782fba31bb902b14ad94e24a68145032b7eedf871ab0bc0d077b6"
dependencies = [
"event-listener",
]
@ -107,6 +117,18 @@ dependencies = [
"url",
]
[[package]]
name = "async-native-tls"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d57d4cec3c647232e1094dc013546c0b33ce785d8aeb251e1f20dfaf8a9a13fe"
dependencies = [
"futures-util",
"native-tls",
"thiserror",
"url",
]
[[package]]
name = "async-process"
version = "1.3.0"
@ -130,6 +152,7 @@ version = "1.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8056f1455169ab86dd47b47391e4ab0cbd25410a70e9fe675544f49bafaf952"
dependencies = [
"async-attributes",
"async-channel",
"async-global-executor",
"async-io",
@ -154,17 +177,17 @@ dependencies = [
[[package]]
name = "async-task"
version = "4.0.3"
version = "4.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e91831deabf0d6d7ec49552e489aed63b7456a7a3c46cff62adad428110b0af0"
checksum = "30696a84d817107fc028e049980e09d5e140e8da8f1caeb17e8e950658a3cea9"
[[package]]
name = "async-tungstenite"
version = "0.16.1"
version = "0.17.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5682ea0913e5c20780fe5785abacb85a411e7437bf52a1bedb93ddb3972cb8dd"
checksum = "7922abeade7dd8948c20dfa1f85dc48cc952d2e0791f7c42b8b1cbb07a57129d"
dependencies = [
"async-native-tls",
"async-native-tls 0.4.0",
"async-std",
"futures-io",
"futures-util",
@ -192,9 +215,9 @@ dependencies = [
[[package]]
name = "autocfg"
version = "1.0.1"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a"
checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
[[package]]
name = "base64"
@ -210,18 +233,18 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]]
name = "block-buffer"
version = "0.9.0"
version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4152116fd6e9dadb291ae18fc1ec3575ed6d84c29642d97890f4b4a3417297e4"
checksum = "0bf7fe51849ea569fd452f37822f606a5cabb684dc918707a0193fd4664ff324"
dependencies = [
"generic-array",
]
[[package]]
name = "blocking"
version = "1.1.0"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "046e47d4b2d391b1f6f8b407b1deb8dee56c1852ccd868becf2710f601b5f427"
checksum = "c6ccb65d468978a086b69884437ded69a90faab3bbe6e67f242173ea728acccc"
dependencies = [
"async-channel",
"async-task",
@ -257,15 +280,15 @@ checksum = "c1db59621ec70f09c5e9b597b220c7a2b43611f4710dc03ceb8748637775692c"
[[package]]
name = "cc"
version = "1.0.72"
version = "1.0.73"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "22a9137b95ea06864e018375b72adfb7db6e6f68cfc8df5a04d00288050485ee"
checksum = "2fff2a6927b3bb87f9595d67196a70493f627687a71d87a0d692242c33f58c11"
[[package]]
name = "cfg-expr"
version = "0.9.0"
version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "edae0b9625d1fce32f7d64b71784d9b1bf8469ec1a9c417e44aaf16a9cbd7571"
checksum = "5e068cb2806bbc15b439846dc16c5f89f8599f2c3e4d73d4449d38f9b2f0b6c5"
dependencies = [
"smallvec",
]
@ -290,9 +313,9 @@ dependencies = [
[[package]]
name = "clap"
version = "3.0.12"
version = "3.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2afefa54b5c7dd40918dc1e09f213a171ab5937aadccab45e804780b238f9f43"
checksum = "d8c93436c21e4698bacadf42917db28b23017027a4deccb35dbe47a7e7840123"
dependencies = [
"atty",
"bitflags",
@ -307,11 +330,11 @@ dependencies = [
[[package]]
name = "clap_derive"
version = "3.0.12"
version = "3.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0fd2078197a22f338bd4fbf7d6387eb6f0d6a3c69e6cbc09f5c93e97321fd92a"
checksum = "da95d038ede1a964ce99f49cbe27a7fb538d1da595e4b4f70b8c8f338d17bf16"
dependencies = [
"heck 0.4.0",
"heck",
"proc-macro-error",
"proc-macro2",
"quote",
@ -329,9 +352,9 @@ dependencies = [
[[package]]
name = "core-foundation"
version = "0.9.2"
version = "0.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6888e10551bb93e424d8df1d07f1a8b4fceb0001a3a4b048bfc47554946f47b3"
checksum = "194a7a9e6de53fa55116934067c844d9d749312f75c6f6d0980e8c252f8c2146"
dependencies = [
"core-foundation-sys",
"libc",
@ -354,14 +377,24 @@ dependencies = [
[[package]]
name = "crossbeam-utils"
version = "0.8.6"
version = "0.8.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cfcae03edb34f947e64acdb1c33ec169824e20657e9ecb61cef6c8c74dcb8120"
checksum = "0bf124c720b7686e3c2663cf54062ab0f68a88af2fb6a030e87e30bf721fcb38"
dependencies = [
"cfg-if",
"lazy_static",
]
[[package]]
name = "crypto-common"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "57952ca27b5e3606ff4dd79b0020231aaf9d6aa76dc05fd30137538c50bd3ce8"
dependencies = [
"generic-array",
"typenum",
]
[[package]]
name = "ctor"
version = "0.1.21"
@ -374,11 +407,12 @@ dependencies = [
[[package]]
name = "digest"
version = "0.9.0"
version = "0.10.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3dd60d1080a57a05ab032377049e0591415d2b31afd7028356dbf3cc6dcb066"
checksum = "f2fb860ca6fafa5552fb6d0e816a69c8e49f0908bf524e30a90d97c85892d506"
dependencies = [
"generic-array",
"block-buffer",
"crypto-common",
]
[[package]]
@ -427,17 +461,11 @@ dependencies = [
"percent-encoding",
]
[[package]]
name = "fragile"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8da1b8f89c5b5a5b7e59405cfcf0bb9588e5ed19f0b57a4cd542bbba3f164a6d"
[[package]]
name = "futures"
version = "0.3.19"
version = "0.3.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "28560757fe2bb34e79f907794bb6b22ae8b0e5c669b638a1132f2592b19035b4"
checksum = "f73fe65f54d1e12b726f517d3e2135ca3125a437b6d998caf1962961f7172d9e"
dependencies = [
"futures-channel",
"futures-core",
@ -450,9 +478,9 @@ dependencies = [
[[package]]
name = "futures-channel"
version = "0.3.19"
version = "0.3.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba3dda0b6588335f360afc675d0564c17a77a2bda81ca178a4b6081bd86c7f0b"
checksum = "c3083ce4b914124575708913bca19bfe887522d6e2e6d0952943f5eac4a74010"
dependencies = [
"futures-core",
"futures-sink",
@ -460,15 +488,15 @@ dependencies = [
[[package]]
name = "futures-core"
version = "0.3.19"
version = "0.3.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d0c8ff0461b82559810cdccfde3215c3f373807f5e5232b71479bff7bb2583d7"
checksum = "0c09fd04b7e4073ac7156a9539b57a484a8ea920f79c7c675d05d289ab6110d3"
[[package]]
name = "futures-executor"
version = "0.3.19"
version = "0.3.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "29d6d2ff5bb10fb95c85b8ce46538a2e5f5e7fdc755623a7d4529ab8a4ed9d2a"
checksum = "9420b90cfa29e327d0429f19be13e7ddb68fa1cccb09d65e5706b8c7a749b8a6"
dependencies = [
"futures-core",
"futures-task",
@ -477,9 +505,9 @@ dependencies = [
[[package]]
name = "futures-io"
version = "0.3.19"
version = "0.3.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b1f9d34af5a1aac6fb380f735fe510746c38067c5bf16c7fd250280503c971b2"
checksum = "fc4045962a5a5e935ee2fdedaa4e08284547402885ab326734432bed5d12966b"
[[package]]
name = "futures-lite"
@ -498,9 +526,9 @@ dependencies = [
[[package]]
name = "futures-macro"
version = "0.3.19"
version = "0.3.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6dbd947adfffb0efc70599b3ddcf7b5597bb5fa9e245eb99f62b3a5f7bb8bd3c"
checksum = "33c1e13800337f4d4d7a316bf45a567dbcb6ffe087f16424852d97e97a91f512"
dependencies = [
"proc-macro2",
"quote",
@ -509,21 +537,21 @@ dependencies = [
[[package]]
name = "futures-sink"
version = "0.3.19"
version = "0.3.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3055baccb68d74ff6480350f8d6eb8fcfa3aa11bdc1a1ae3afdd0514617d508"
checksum = "21163e139fa306126e6eedaf49ecdb4588f939600f0b1e770f4205ee4b7fa868"
[[package]]
name = "futures-task"
version = "0.3.19"
version = "0.3.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6ee7c6485c30167ce4dfb83ac568a849fe53274c831081476ee13e0dce1aad72"
checksum = "57c66a976bf5909d801bbef33416c41372779507e7a6b3a5e25e4749c58f776a"
[[package]]
name = "futures-util"
version = "0.3.19"
version = "0.3.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d9b5cf40b47a271f77a8b1bec03ca09044d99d2372c0de244e66430761127164"
checksum = "d8b7abd5d659d9b90c8cba917f6ec750a74e2dc23902ef9cd4cc8c8b22e6036a"
dependencies = [
"futures-channel",
"futures-core",
@ -549,9 +577,9 @@ dependencies = [
[[package]]
name = "getrandom"
version = "0.2.4"
version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "418d37c8b1d42553c93648be529cb70f920d3baf8ef469b74b9638df426e0b4c"
checksum = "d39cd93900197114fa1fcb7ae84ca742095eed9442088988ae74fa744e930e77"
dependencies = [
"cfg-if",
"libc",
@ -560,9 +588,9 @@ dependencies = [
[[package]]
name = "glib"
version = "0.15.3"
version = "0.15.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4a703581e2538fe699c5476cf26b456d694c5272b6e999d3ab47711c5eaa2dd2"
checksum = "89528258cfdc79b1e54591202705be67896ad254f99e3cc2ea3710e0307148f2"
dependencies = [
"bitflags",
"futures-channel",
@ -580,12 +608,12 @@ dependencies = [
[[package]]
name = "glib-macros"
version = "0.15.3"
version = "0.15.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e58b262ff65ef771003873cea8c10e0fe854f1c508d48d62a4111a1ff163f7d1"
checksum = "41bfd8d227dead0829ac142454e97531b93f576d0805d779c42bfd799c65c572"
dependencies = [
"anyhow",
"heck 0.4.0",
"heck",
"proc-macro-crate",
"proc-macro-error",
"proc-macro2",
@ -595,9 +623,9 @@ dependencies = [
[[package]]
name = "glib-sys"
version = "0.15.1"
version = "0.15.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c668102c6e15e0a7f6b99b59f602c2e806967bb86414f617b77e19b1de5b3fac"
checksum = "19289e4953dad38c9fea1c5c52cc594f29afc4a70802822ef382dca356b27bfd"
dependencies = [
"libc",
"system-deps",
@ -605,22 +633,21 @@ dependencies = [
[[package]]
name = "gloo-timers"
version = "0.2.2"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6f16c88aa13d2656ef20d1c042086b8767bbe2bdb62526894275a1b062161b2e"
checksum = "4d12a7f4e95cfe710f1d624fb1210b7d961a5fb05c4fd942f4feab06e61f590e"
dependencies = [
"futures-channel",
"futures-core",
"js-sys",
"wasm-bindgen",
"web-sys",
]
[[package]]
name = "gobject-sys"
version = "0.15.1"
version = "0.15.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6edb1f0b3e4c08e2a0a490d1082ba9e902cdff8ff07091e85c6caec60d17e2ab"
checksum = "a085ec9acece647f905b675705c349eb00acba30505f5cee43459bc3b2e968cc"
dependencies = [
"glib-sys",
"libc",
@ -638,13 +665,12 @@ dependencies = [
[[package]]
name = "gstreamer"
version = "0.18.2"
version = "0.18.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9bc8364f9129c73e214ac1128fcff0e9a25cbc69f06d3ede9f7b07c8b6338680"
checksum = "5c1545fac08d7a28f8707101298cbf99d1bc72529698ff2d1fec87cc30a3fb9a"
dependencies = [
"bitflags",
"cfg-if",
"fragile",
"futures-channel",
"futures-core",
"futures-util",
@ -722,22 +748,23 @@ dependencies = [
[[package]]
name = "gstreamer-rtp"
version = "0.18.0"
version = "0.18.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f43fb74e6cb6987651680767dc907017b1bb70518092336679fe9c3050ef1fb4"
checksum = "3d1a7fca987a2f01b555df2556ce7b64af546cb4a0c4376c84a816924278ae06"
dependencies = [
"bitflags",
"glib",
"gstreamer",
"gstreamer-rtp-sys",
"libc",
"once_cell",
]
[[package]]
name = "gstreamer-rtp-sys"
version = "0.18.0"
version = "0.18.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f859fa6194c2b348514f861dfacb7a153888bf0898a748797ce58cbab8fde0c7"
checksum = "2afde26d03b4146a39d07f576082f97b3775e7884ef3ccef5c4b7c0917e17469"
dependencies = [
"glib-sys",
"gstreamer-base-sys",
@ -783,13 +810,12 @@ dependencies = [
[[package]]
name = "gstreamer-video"
version = "0.18.1"
version = "0.18.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "410c72d885a67aeb7dbfa49c347e6c85d60f54e1cdaf6aadf8b5364892451261"
checksum = "bfbeef259fe286d6271402160daf7692b6bc56d52b2a9437dead797f3f60c222"
dependencies = [
"bitflags",
"cfg-if",
"fragile",
"futures-channel",
"glib",
"gstreamer",
@ -802,9 +828,9 @@ dependencies = [
[[package]]
name = "gstreamer-video-sys"
version = "0.18.0"
version = "0.18.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "255c487bf6dd145e23558eaf1c92ef0946ee1999d22bdadc1e492b463609c4b6"
checksum = "33331b1675e73b5b000c796354278eca7fdde9327015971d9f41afe28b96e0dc"
dependencies = [
"glib-sys",
"gobject-sys",
@ -816,9 +842,9 @@ dependencies = [
[[package]]
name = "gstreamer-webrtc"
version = "0.18.0"
version = "0.18.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "83edc69fc8bcc242a6504b3974fbc26c2c9be20df39e67b13e808ce5d872f3e8"
checksum = "66be30e56113bcdf5acc151bd098fae4916a41f57dc99b570e14e4112111b723"
dependencies = [
"glib",
"gstreamer",
@ -829,9 +855,9 @@ dependencies = [
[[package]]
name = "gstreamer-webrtc-sys"
version = "0.18.0"
version = "0.18.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "96ae4a4d636e74ef024f726a492ba3840102e9066b41cf4085b9d6b5e6efd030"
checksum = "1ea5f3bab2859f0b279edab6ea2620700259fd1bf91cf82185fe10a0fc5e96cc"
dependencies = [
"glib-sys",
"gstreamer-sdp-sys",
@ -845,15 +871,6 @@ version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ab5ef0d4909ef3724cc8cce6ccc8572c5c817592e9285f5464f8e86f8bd3726e"
[[package]]
name = "heck"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6d621efb26863f0e9924c6ac577e8275e5e6b77455db64ffa6c65c904e9e132c"
dependencies = [
"unicode-segmentation",
]
[[package]]
name = "heck"
version = "0.4.0"
@ -882,9 +899,9 @@ dependencies = [
[[package]]
name = "httparse"
version = "1.5.1"
version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "acd94fdbe1d4ff688b67b04eee2e17bd50995534a61539e45adfefb45e5e5503"
checksum = "9100414882e15fb7feccb4897e5f0ff0ff1ca7d1a86a23208ada4d7a18e6c6c4"
[[package]]
name = "idna"
@ -948,9 +965,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
[[package]]
name = "libc"
version = "0.2.114"
version = "0.2.120"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b0005d08a8f7b65fb8073cb697aa0b12b631ed251ce73d862ce50eeb52ce3b50"
checksum = "ad5c14e80759d0939d013e6ca49930e59fc53dd8e5009132f76240c179380c09"
[[package]]
name = "log"
@ -1050,15 +1067,9 @@ dependencies = [
[[package]]
name = "once_cell"
version = "1.9.0"
version = "1.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "da32515d9f6e6e489d7bc9d84c71b060db7247dc035bbe44eac88cf87486d8d5"
[[package]]
name = "opaque-debug"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5"
checksum = "87f3e037eac156d1775da914196f0f37741a274155e34a0b7e427c35d2a2ecb9"
[[package]]
name = "openssl"
@ -1174,9 +1185,9 @@ checksum = "bc5c99d529f0d30937f6f4b8a86d988047327bb88d04d2c4afc356de74722131"
[[package]]
name = "proc-macro-crate"
version = "1.1.0"
version = "1.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1ebace6889caf889b4d3f76becee12e90353f2b8c7d875534a71e5742f8f6f83"
checksum = "e17d47ce914bf4de440332250b0edd23ce48c005f59fab39d3335866b114f11a"
dependencies = [
"thiserror",
"toml",
@ -1226,14 +1237,13 @@ dependencies = [
[[package]]
name = "rand"
version = "0.8.4"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2e7573632e6454cf6b99d7aac4ccca54be06da05aca2ef7423d22d27d4d4bcd8"
checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404"
dependencies = [
"libc",
"rand_chacha",
"rand_core",
"rand_hc",
]
[[package]]
@ -1255,29 +1265,20 @@ dependencies = [
"getrandom",
]
[[package]]
name = "rand_hc"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d51e9f596de227fda2ea6c84607f5558e196eeaf43c986b724ba4fb8fdf497e7"
dependencies = [
"rand_core",
]
[[package]]
name = "redox_syscall"
version = "0.2.10"
version = "0.2.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8383f39639269cde97d255a32bdb68c047337295414940c68bdd30c2e13203ff"
checksum = "8380fe0152551244f0747b1bf41737e0f8a74f97a14ccefd1148187271634f3c"
dependencies = [
"bitflags",
]
[[package]]
name = "regex"
version = "1.5.4"
version = "1.5.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d07a8629359eb56f1e2fb1652bb04212c072a87ba68546a04065d525673ac461"
checksum = "1a11647b6b25ff05a515cb92c365cec08801e83423a235b51e231e1808747286"
dependencies = [
"regex-syntax",
]
@ -1324,9 +1325,9 @@ dependencies = [
[[package]]
name = "security-framework"
version = "2.5.0"
version = "2.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d09d3c15d814eda1d6a836f2f2b56a6abc1446c8a34351cb3180d3db92ffe4ce"
checksum = "2dc14f172faf8a0194a3aded622712b0de276821addc574fa54fc0a1167e10dc"
dependencies = [
"bitflags",
"core-foundation",
@ -1337,9 +1338,9 @@ dependencies = [
[[package]]
name = "security-framework-sys"
version = "2.5.0"
version = "2.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e90dd10c41c6bfc633da6e0c659bd25d31e0791e5974ac42970267d59eba87f7"
checksum = "0160a13a177a45bfb43ce71c01580998474f556ad854dcbca936dd2841a5c556"
dependencies = [
"core-foundation-sys",
"libc",
@ -1347,9 +1348,9 @@ dependencies = [
[[package]]
name = "serde"
version = "1.0.135"
version = "1.0.136"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2cf9235533494ea2ddcdb794665461814781c53f19d87b76e571a1c35acbad2b"
checksum = "ce31e24b01e1e524df96f1c2fdd054405f8d7376249a5110886fb4b658484789"
dependencies = [
"serde_derive",
]
@ -1365,9 +1366,9 @@ dependencies = [
[[package]]
name = "serde_derive"
version = "1.0.135"
version = "1.0.136"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8dcde03d87d4c973c04be249e7d8f0b35db1c848c487bd43032808e59dd8328d"
checksum = "08597e7152fcd306f41838ed3e37be9eaeed2b61c42e2117266a554fab4662f9"
dependencies = [
"proc-macro2",
"quote",
@ -1376,9 +1377,9 @@ dependencies = [
[[package]]
name = "serde_json"
version = "1.0.78"
version = "1.0.79"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d23c1ba4cf0efd44be32017709280b32d1cea5c3f1275c3b6d9e8bc54f758085"
checksum = "8e8d9fa5c3b304765ce1fd9c4c8a3de2c8db365a5b91be52f186efc675681d95"
dependencies = [
"itoa",
"ryu",
@ -1387,15 +1388,13 @@ dependencies = [
[[package]]
name = "sha-1"
version = "0.9.8"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "99cd6713db3cf16b6c84e06321e049a9b9f699826e16096d23bbcc44d15d51a6"
checksum = "028f48d513f9678cda28f6e4064755b3fbb2af6acd672f2c209b62323f7aea0f"
dependencies = [
"block-buffer",
"cfg-if",
"cpufeatures",
"digest",
"opaque-debug",
]
[[package]]
@ -1440,9 +1439,9 @@ checksum = "f2dd574626839106c320a323308629dcb1acfc96e32a8cba364ddc61ac23ee83"
[[package]]
name = "socket2"
version = "0.4.3"
version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0f82496b90c36d70af5fcd482edaa2e0bd16fade569de1330405fecbbdac736b"
checksum = "66d72b759436ae32898a2af0a14218dbf55efde3feeb170eb623637db85ee1e0"
dependencies = [
"libc",
"winapi",
@ -1456,9 +1455,9 @@ checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623"
[[package]]
name = "syn"
version = "1.0.86"
version = "1.0.89"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a65b3f4ffa0092e9887669db0eae07941f023991ab58ea44da8fe8e2d511c6b"
checksum = "ea297be220d52398dcc07ce15a209fce436d361735ac1db700cab3b6cdfb9f54"
dependencies = [
"proc-macro2",
"quote",
@ -1467,12 +1466,12 @@ dependencies = [
[[package]]
name = "system-deps"
version = "6.0.0"
version = "6.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b1487aaddaacbc5d60a2a507ba1617c5ca66c57dd0dd07d0c5efd5b693841d4"
checksum = "a1a45a1c4c9015217e12347f2a411b57ce2c4fc543913b14b6fe40483328e709"
dependencies = [
"cfg-expr",
"heck 0.3.3",
"heck",
"pkg-config",
"toml",
"version-compare",
@ -1494,18 +1493,29 @@ dependencies = [
[[package]]
name = "termcolor"
version = "1.1.2"
version = "1.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2dfed899f0eb03f32ee8c6a0aabdb8a7949659e3466561fc0adf54e26d88c5f4"
checksum = "bab24d30b911b2376f3a13cc2cd443142f0c81dda04c118693e35b3835757755"
dependencies = [
"winapi-util",
]
[[package]]
name = "textwrap"
version = "0.14.2"
name = "test-log"
version = "0.2.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0066c8d12af8b5acd21e00547c3797fde4e8677254a7ee429176ccebbe93dd80"
checksum = "eb78caec569a40f42c078c798c0e35b922d9054ec28e166f0d6ac447563d91a4"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "textwrap"
version = "0.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b1141d4d61095b28419e22cb0bbf02755f5e54e0526f97f1e3d1d160e60885fb"
[[package]]
name = "thiserror"
@ -1562,9 +1572,9 @@ dependencies = [
[[package]]
name = "tracing"
version = "0.1.29"
version = "0.1.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "375a639232caf30edfc78e8d89b2d4c375515393e7af7e16f01cd96917fb2105"
checksum = "4a1bdf54a7c28a2bbf701e1d2233f6c77f473486b94bee4f9678da5a148dca7f"
dependencies = [
"cfg-if",
"log",
@ -1575,9 +1585,9 @@ dependencies = [
[[package]]
name = "tracing-attributes"
version = "0.1.18"
version = "0.1.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f4f480b8f81512e825f337ad51e94c1eb5d3bbdf2b363dcd01e2b19a9ffe3f8e"
checksum = "2e65ce065b4b5c53e73bb28912318cb8c9e9ad3921f1d669eb0e68b4c8143a2b"
dependencies = [
"proc-macro2",
"quote",
@ -1586,11 +1596,12 @@ dependencies = [
[[package]]
name = "tracing-core"
version = "0.1.21"
version = "0.1.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1f4ed65637b8390770814083d20756f87bfa2c21bf2f110babdc5438351746e4"
checksum = "aa31669fa42c09c34d94d8165dd2012e8ff3c66aca50f3bb226b68f216f2706c"
dependencies = [
"lazy_static",
"valuable",
]
[[package]]
@ -1606,9 +1617,9 @@ dependencies = [
[[package]]
name = "tracing-subscriber"
version = "0.3.6"
version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77be66445c4eeebb934a7340f227bfe7b338173d3f8c00a60a5a58005c9faecf"
checksum = "9e0ab7bdc962035a87fba73f3acca9b8a8d0034c2e6f60b84aeaaddddc155dce"
dependencies = [
"ansi_term",
"lazy_static",
@ -1624,9 +1635,9 @@ dependencies = [
[[package]]
name = "tungstenite"
version = "0.16.0"
version = "0.17.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6ad3713a14ae247f22a728a0456a545df14acf3867f905adff84be99e23b3ad1"
checksum = "d96a2dea40e7570482f28eb57afbe42d97551905da6a9400acc5c328d24004f5"
dependencies = [
"base64",
"byteorder",
@ -1663,12 +1674,6 @@ dependencies = [
"tinyvec",
]
[[package]]
name = "unicode-segmentation"
version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8895849a949e7845e06bd6dc1aa51731a103c42707010a5b591c0038fb73385b"
[[package]]
name = "unicode-xid"
version = "0.2.2"
@ -1702,6 +1707,12 @@ dependencies = [
"getrandom",
]
[[package]]
name = "valuable"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d"
[[package]]
name = "value-bag"
version = "1.0.0-alpha.8"
@ -1837,7 +1848,6 @@ dependencies = [
"gstreamer-webrtc",
"once_cell",
"serde",
"serde_derive",
"serde_json",
"smallvec",
"thiserror",
@ -1845,6 +1855,37 @@ dependencies = [
"tracing-log",
"tracing-subscriber",
"uuid",
"webrtcsink-protocol",
]
[[package]]
name = "webrtcsink-protocol"
version = "0.1.0"
dependencies = [
"serde",
"serde_json",
]
[[package]]
name = "webrtcsink-signalling"
version = "0.1.0"
dependencies = [
"anyhow",
"async-native-tls 0.3.3",
"async-std",
"async-tungstenite",
"clap",
"futures",
"pin-project-lite",
"serde",
"serde_json",
"test-log",
"thiserror",
"tracing",
"tracing-log",
"tracing-subscriber",
"uuid",
"webrtcsink-protocol",
]
[[package]]

View file

@ -2,4 +2,6 @@
members = [
"plugins",
"protocol",
"signalling",
]

View file

@ -15,9 +15,9 @@ useful alternative.
`webrtcsink` implements the following features:
* Built-in signaller: when using the default signalling server (provided as a python
script [here](signalling/simple-server.py)), this element will perform signalling without
requiring application interaction. This makes it usable directly from `gst-launch`.
* Built-in signaller: when using the default signalling server, this element will
perform signalling without requiring application interaction.
This makes it usable directly from `gst-launch`.
* Application-provided signalling: `webrtcsink` can be instantiated by an application
with a custom signaller. That signaller must be a GObject, and must implement the
@ -90,8 +90,7 @@ cargo build
Open three terminals. In the first, run:
``` shell
cd signalling
python3 simple-server.py --addr=127.0.0.1 --disable-ssl
WEBRTCSINK_SIGNALLING_SERVER_LOG=debug cargo run --bin server
```
In the second, run:
@ -109,7 +108,7 @@ gst-launch-1.0 webrtcsink name=ws videotestsrc ! ws. audiotestsrc ! ws.
```
When the pipeline above is running succesfully, open a browser and
point it to the python server:
point it to the http server:
``` shell
xdg-open http://127.0.0.1:8000

View file

@ -21,11 +21,11 @@ anyhow = "1"
thiserror = "1"
futures = "0.3"
async-std = { version = "1", features = ["unstable"] }
async-tungstenite = { version = "0.16", features = ["async-std-runtime", "async-native-tls"] }
async-tungstenite = { version = "0.17", features = ["async-std-runtime", "async-native-tls"] }
serde = "1"
serde_derive = "1"
serde_json = "1"
fastrand = "1.0"
webrtcsink-protocol = { version = "0.1", path="../protocol" }
[dev-dependencies]
tracing = { version = "0.1", features = ["log"] }

View file

@ -9,8 +9,8 @@ use gst::glib::prelude::*;
use gst::subclass::prelude::*;
use gst::{gst_debug, gst_error, gst_info, gst_trace, gst_warning};
use once_cell::sync::Lazy;
use serde_derive::{Deserialize, Serialize};
use std::sync::Mutex;
use webrtcsink_protocol as p;
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
gst::DebugCategory::new(
@ -23,39 +23,11 @@ static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
#[derive(Default)]
struct State {
/// Sender for the websocket messages
websocket_sender: Option<mpsc::Sender<WsMessage>>,
websocket_sender: Option<mpsc::Sender<p::IncomingMessage>>,
send_task_handle: Option<task::JoinHandle<Result<(), Error>>>,
receive_task_handle: Option<task::JoinHandle<()>>,
}
#[derive(Serialize, Deserialize)]
#[serde(tag = "type")]
#[serde(rename_all = "lowercase")]
enum SdpMessage {
Offer { sdp: String },
Answer { sdp: String },
}
#[derive(Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
enum JsonMsgInner {
Ice {
candidate: String,
#[serde(rename = "sdpMLineIndex")]
sdp_mline_index: u32,
},
Sdp(SdpMessage),
}
#[derive(Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
struct JsonMsg {
#[serde(rename = "peer-id")]
peer_id: String,
#[serde(flatten)]
inner: JsonMsgInner,
}
struct Settings {
address: Option<String>,
}
@ -92,20 +64,19 @@ impl Signaller {
// Channel for asynchronously sending out websocket message
let (mut ws_sink, mut ws_stream) = ws.split();
ws_sink
.send(WsMessage::Text("REGISTER PRODUCER".to_string()))
.await?;
// 1000 is completely arbitrary, we simply don't want infinite piling
// up of messages as with unbounded
let (websocket_sender, mut websocket_receiver) = mpsc::channel::<WsMessage>(1000);
let (mut websocket_sender, mut websocket_receiver) =
mpsc::channel::<p::IncomingMessage>(1000);
let element_clone = element.downgrade();
let send_task_handle = task::spawn(async move {
while let Some(msg) = websocket_receiver.next().await {
if let Some(element) = element_clone.upgrade() {
gst_trace!(CAT, obj: &element, "Sending websocket message {:?}", msg);
}
ws_sink.send(msg).await?;
ws_sink
.send(WsMessage::Text(serde_json::to_string(&msg).unwrap()))
.await?;
}
if let Some(element) = element_clone.upgrade() {
@ -118,6 +89,10 @@ impl Signaller {
Ok::<(), Error>(())
});
websocket_sender
.send(p::IncomingMessage::Register(p::RegisterMessage::Producer))
.await?;
let element_clone = element.downgrade();
let receive_task_handle = task::spawn(async move {
while let Some(msg) = async_std::stream::StreamExt::next(&mut ws_stream).await {
@ -126,50 +101,78 @@ impl Signaller {
Ok(WsMessage::Text(msg)) => {
gst_trace!(CAT, obj: &element, "Received message {}", msg);
if msg.starts_with("REGISTERED ") {
gst_info!(CAT, obj: &element, "We are registered with the server");
} else if let Some(peer_id) = msg.strip_prefix("START_SESSION ") {
if let Err(err) = element.add_consumer(peer_id) {
gst_warning!(CAT, obj: &element, "{}", err);
}
} else if let Some(peer_id) = msg.strip_prefix("END_SESSION ") {
if let Err(err) = element.remove_consumer(peer_id) {
gst_warning!(CAT, obj: &element, "{}", err);
}
} else if let Ok(msg) = serde_json::from_str::<JsonMsg>(&msg) {
match msg.inner {
JsonMsgInner::Sdp(SdpMessage::Answer { sdp }) => {
if let Err(err) = element.handle_sdp(
&msg.peer_id,
&gst_webrtc::WebRTCSessionDescription::new(
gst_webrtc::WebRTCSDPType::Answer,
gst_sdp::SDPMessage::parse_buffer(sdp.as_bytes())
.unwrap(),
),
) {
if let Ok(msg) = serde_json::from_str::<p::OutgoingMessage>(&msg) {
match msg {
p::OutgoingMessage::Registered(
p::RegisteredMessage::Producer { peer_id },
) => {
gst_info!(
CAT,
obj: &element,
"We are registered with the server, our peer id is {}",
peer_id
);
}
p::OutgoingMessage::Registered(_) => unreachable!(),
p::OutgoingMessage::StartSession { peer_id } => {
if let Err(err) = element.add_consumer(&peer_id) {
gst_warning!(CAT, obj: &element, "{}", err);
}
}
JsonMsgInner::Sdp(SdpMessage::Offer { .. }) => {
p::OutgoingMessage::EndSession { peer_id } => {
if let Err(err) = element.remove_consumer(&peer_id) {
gst_warning!(CAT, obj: &element, "{}", err);
}
}
p::OutgoingMessage::Peer(p::PeerMessage {
peer_id,
peer_message,
}) => match peer_message {
p::PeerMessageInner::Sdp(p::SdpMessage::Answer { sdp }) => {
if let Err(err) = element.handle_sdp(
&peer_id,
&gst_webrtc::WebRTCSessionDescription::new(
gst_webrtc::WebRTCSDPType::Answer,
gst_sdp::SDPMessage::parse_buffer(
sdp.as_bytes(),
)
.unwrap(),
),
) {
gst_warning!(CAT, obj: &element, "{}", err);
}
}
p::PeerMessageInner::Sdp(p::SdpMessage::Offer {
..
}) => {
gst_warning!(
CAT,
obj: &element,
"Ignoring offer from peer"
);
}
p::PeerMessageInner::Ice {
candidate,
sdp_m_line_index,
} => {
if let Err(err) = element.handle_ice(
&peer_id,
Some(sdp_m_line_index),
None,
&candidate,
) {
gst_warning!(CAT, obj: &element, "{}", err);
}
}
},
_ => {
gst_warning!(
CAT,
obj: &element,
"Ignoring offer from peer"
"Ignoring unsupported message {:?}",
msg
);
}
JsonMsgInner::Ice {
candidate,
sdp_mline_index,
} => {
if let Err(err) = element.handle_ice(
&msg.peer_id,
Some(sdp_mline_index),
None,
&candidate,
) {
gst_warning!(CAT, obj: &element, "{}", err);
}
}
}
} else {
gst_error!(
@ -237,20 +240,17 @@ impl Signaller {
) {
let state = self.state.lock().unwrap();
let msg = JsonMsg {
let msg = p::IncomingMessage::Peer(p::PeerMessage {
peer_id: peer_id.to_string(),
inner: JsonMsgInner::Sdp(SdpMessage::Offer {
peer_message: p::PeerMessageInner::Sdp(p::SdpMessage::Offer {
sdp: sdp.sdp().as_text().unwrap(),
}),
};
});
if let Some(mut sender) = state.websocket_sender.clone() {
let element = element.downgrade();
task::spawn(async move {
if let Err(err) = sender
.send(WsMessage::Text(serde_json::to_string(&msg).unwrap()))
.await
{
if let Err(err) = sender.send(msg).await {
if let Some(element) = element.upgrade() {
element.handle_signalling_error(anyhow!("Error: {}", err).into());
}
@ -264,26 +264,23 @@ impl Signaller {
element: &WebRTCSink,
peer_id: &str,
candidate: &str,
sdp_mline_index: Option<u32>,
sdp_m_line_index: Option<u32>,
_sdp_mid: Option<String>,
) {
let state = self.state.lock().unwrap();
let msg = JsonMsg {
let msg = p::IncomingMessage::Peer(p::PeerMessage {
peer_id: peer_id.to_string(),
inner: JsonMsgInner::Ice {
peer_message: p::PeerMessageInner::Ice {
candidate: candidate.to_string(),
sdp_mline_index: sdp_mline_index.unwrap(),
sdp_m_line_index: sdp_m_line_index.unwrap(),
},
};
});
if let Some(mut sender) = state.websocket_sender.clone() {
let element = element.downgrade();
task::spawn(async move {
if let Err(err) = sender
.send(WsMessage::Text(serde_json::to_string(&msg).unwrap()))
.await
{
if let Err(err) = sender.send(msg).await {
if let Some(element) = element.upgrade() {
element.handle_signalling_error(anyhow!("Error: {}", err).into());
}
@ -324,7 +321,9 @@ impl Signaller {
if let Some(mut sender) = state.websocket_sender.clone() {
task::spawn(async move {
if let Err(err) = sender
.send(WsMessage::Text(format!("END_SESSION {}", peer_id)))
.send(p::IncomingMessage::EndSession(p::EndSessionMessage {
peer_id: peer_id.to_string(),
}))
.await
{
if let Some(element) = element.upgrade() {

View file

@ -1455,14 +1455,14 @@ impl WebRTCSink {
&self,
element: &super::WebRTCSink,
peer_id: String,
sdp_mline_index: u32,
sdp_m_line_index: u32,
candidate: String,
) {
let mut state = self.state.lock().unwrap();
if let Err(err) =
state
.signaller
.handle_ice(element, &peer_id, &candidate, Some(sdp_mline_index), None)
.handle_ice(element, &peer_id, &candidate, Some(sdp_m_line_index), None)
{
gst_warning!(
CAT,
@ -1516,12 +1516,12 @@ impl WebRTCSink {
webrtcbin.connect("on-ice-candidate", false, move |values| {
if let Some(element) = element_clone.upgrade() {
let this = Self::from_instance(&element);
let sdp_mline_index = values[1].get::<u32>().expect("Invalid argument");
let sdp_m_line_index = values[1].get::<u32>().expect("Invalid argument");
let candidate = values[2].get::<String>().expect("Invalid argument");
this.on_ice_candidate(
&element,
peer_id_clone.to_string(),
sdp_mline_index,
sdp_m_line_index,
candidate,
);
}
@ -1864,19 +1864,19 @@ impl WebRTCSink {
&self,
_element: &super::WebRTCSink,
peer_id: &str,
sdp_mline_index: Option<u32>,
sdp_m_line_index: Option<u32>,
_sdp_mid: Option<String>,
candidate: &str,
) -> Result<(), WebRTCSinkError> {
let state = self.state.lock().unwrap();
let sdp_mline_index = sdp_mline_index.ok_or(WebRTCSinkError::MandatorySdpMlineIndex)?;
let sdp_m_line_index = sdp_m_line_index.ok_or(WebRTCSinkError::MandatorySdpMlineIndex)?;
if let Some(consumer) = state.consumers.get(peer_id) {
gst_trace!(CAT, "adding ice candidate for peer {}", peer_id);
consumer
.webrtcbin
.emit_by_name::<()>("add-ice-candidate", &[&sdp_mline_index, &candidate]);
.emit_by_name::<()>("add-ice-candidate", &[&sdp_m_line_index, &candidate]);
Ok(())
} else {
Err(WebRTCSinkError::NoConsumerWithId(peer_id.to_string()))

View file

@ -41,14 +41,14 @@ pub trait Signallable: Sync + Send + 'static {
/// sdp_mid is exposed for future proofing, see
/// https://gitlab.freedesktop.org/gstreamer/gst-plugins-bad/-/issues/1174,
/// at the moment sdp_mline_index will always be Some and sdp_mid will always
/// at the moment sdp_m_line_index will always be Some and sdp_mid will always
/// be None
fn handle_ice(
&mut self,
element: &WebRTCSink,
peer_id: &str,
candidate: &str,
sdp_mline_index: Option<u32>,
sdp_m_line_index: Option<u32>,
sdp_mid: Option<String>,
) -> Result<(), Box<dyn Error>>;
@ -96,17 +96,17 @@ impl WebRTCSink {
/// sdp_mid is exposed for future proofing, see
/// https://gitlab.freedesktop.org/gstreamer/gst-plugins-bad/-/issues/1174,
/// at the moment sdp_mline_index must be Some
/// at the moment sdp_m_line_index must be Some
pub fn handle_ice(
&self,
peer_id: &str,
sdp_mline_index: Option<u32>,
sdp_m_line_index: Option<u32>,
sdp_mid: Option<String>,
candidate: &str,
) -> Result<(), WebRTCSinkError> {
let ws = imp::WebRTCSink::from_instance(self);
ws.handle_ice(self, peer_id, sdp_mline_index, sdp_mid, candidate)
ws.handle_ice(self, peer_id, sdp_m_line_index, sdp_mid, candidate)
}
pub fn handle_signalling_error(&self, error: Box<dyn Error + Send + Sync>) {

12
protocol/Cargo.toml Normal file
View file

@ -0,0 +1,12 @@
[package]
name="webrtcsink-protocol"
version = "0.1.0"
edition = "2018"
authors = ["Mathieu Duponchelle <mathieu@centricular.com>"]
license = "MIT"
description = "GStreamer WebRTC sink default protocol"
repository = "https://github.com/centricular/webrtcsink/"
[dependencies]
serde = { version = "1", features = ["derive"] }
serde_json = "1"

134
protocol/src/lib.rs Normal file
View file

@ -0,0 +1,134 @@
/// The default protocol used by the signalling server
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Debug, PartialEq)]
#[serde(tag = "peerType")]
#[serde(rename_all = "camelCase")]
/// Confirms registration
pub enum RegisteredMessage {
/// Registered as a producer
#[serde(rename_all = "camelCase")]
Producer { peer_id: String },
/// Registered as a consumer
#[serde(rename_all = "camelCase")]
Consumer { peer_id: String },
/// Registered as a listener
#[serde(rename_all = "camelCase")]
Listener { peer_id: String },
}
#[derive(Serialize, Deserialize, Debug, PartialEq)]
#[serde(tag = "type")]
#[serde(rename_all = "camelCase")]
/// Messages sent from the server to peers
pub enum OutgoingMessage {
/// Confirms registration
Registered(RegisteredMessage),
/// Notifies listeners that a producer was registered
#[serde(rename_all = "camelCase")]
ProducerAdded { peer_id: String },
/// Notifies listeners that a producer was removed
#[serde(rename_all = "camelCase")]
ProducerRemoved { peer_id: String },
/// Instructs a peer to generate an offer
#[serde(rename_all = "camelCase")]
StartSession { peer_id: String },
/// Signals that the session the peer was in was ended
#[serde(rename_all = "camelCase")]
EndSession { peer_id: String },
/// Messages directly forwarded from one peer to another
Peer(PeerMessage),
/// Provides the current list of consumer peers
List { producers: Vec<String> },
/// Notifies that an error occurred with the peer's current session
Error { details: String },
}
#[derive(Serialize, Deserialize, Debug)]
#[serde(tag = "peerType")]
#[serde(rename_all = "camelCase")]
/// Register with a peer type
pub enum RegisterMessage {
/// Register as a producer
Producer,
/// Register as a consumer
Consumer,
/// Register as a listener
Listener,
}
#[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
/// Ask the server to start a session with a producer peer
pub struct StartSessionMessage {
/// Identifies the peer
pub peer_id: String,
}
#[derive(Serialize, Deserialize, Debug, PartialEq)]
#[serde(tag = "type")]
#[serde(rename_all = "camelCase")]
/// Conveys a SDP
pub enum SdpMessage {
/// Conveys an offer
Offer {
/// The SDP
sdp: String,
},
/// Conveys an answer
Answer {
/// The SDP
sdp: String,
},
}
#[derive(Serialize, Deserialize, Debug, PartialEq)]
#[serde(rename_all = "camelCase")]
/// Contents of the peer message
pub enum PeerMessageInner {
/// Conveys an ICE candidate
#[serde(rename_all = "camelCase")]
Ice {
/// The candidate string
candidate: String,
/// The mline index the candidate applies to
sdp_m_line_index: u32,
},
Sdp(SdpMessage),
}
#[derive(Serialize, Deserialize, Debug, PartialEq)]
#[serde(rename_all = "camelCase")]
/// Messages directly forwarded from one peer to another
pub struct PeerMessage {
/// The identifier of the peer, which must be in a session with the sender
pub peer_id: String,
/// The contents of the message
#[serde(flatten)]
pub peer_message: PeerMessageInner,
}
#[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
/// End a session
pub struct EndSessionMessage {
/// The identifier of the peer to end the session with
pub peer_id: String,
}
#[derive(Serialize, Deserialize, Debug)]
#[serde(tag = "type")]
#[serde(rename_all = "camelCase")]
/// Messages received by the server from peers
pub enum IncomingMessage {
/// Register as a peer type
Register(RegisterMessage),
/// Start a session with a producer peer
StartSession(StartSessionMessage),
/// End an existing session
EndSession(EndSessionMessage),
/// Send a message to a peer the sender is currently in session with
Peer(PeerMessage),
/// Retrieve the current list of producers
List,
}

26
signalling/Cargo.toml Normal file
View file

@ -0,0 +1,26 @@
[package]
name="webrtcsink-signalling"
version = "0.1.0"
edition = "2018"
authors = ["Mathieu Duponchelle <mathieu@centricular.com>"]
license = "MIT"
description = "GStreamer WebRTC sink signalling server"
repository = "https://github.com/centricular/webrtcsink/"
[dependencies]
anyhow = "1"
async-std = { version = "1", features = ["unstable", "attributes"] }
async-native-tls = "0.3"
async-tungstenite = { version = "0.17", features = ["async-std-runtime", "async-native-tls"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
clap = { version = "3", features = ["derive"] }
tracing = { version = "0.1", features = ["log"] }
tracing-subscriber = { version = "0.3", features = ["registry", "env-filter"] }
tracing-log = "0.1"
futures = "0.3"
uuid = { version = "0.8", features = ["v4"] }
thiserror = "1"
test-log = { version = "0.2", features = ["trace"], default-features = false }
pin-project-lite = "0.2"
webrtcsink-protocol = { version = "0.1", path="../protocol" }

View file

@ -1,253 +0,0 @@
#!/usr/bin/env python3
#
# Example 1-1 call signalling server
#
# Copyright (C) 2017 Centricular Ltd.
#
# Author: Nirbheek Chauhan <nirbheek@centricular.com>
#
import os
import sys
import ssl
import logging
import asyncio
import websockets
import argparse
import json
import uuid
import concurrent
from collections import defaultdict
from concurrent.futures._base import TimeoutError
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--addr', default='0.0.0.0', help='Address to listen on')
parser.add_argument('--port', default=8443, type=int, help='Port to listen on')
parser.add_argument('--keepalive-timeout', dest='keepalive_timeout', default=30, type=int, help='Timeout for keepalive (in seconds)')
parser.add_argument('--disable-ssl', default=False, help='Disable ssl', action='store_true')
parser.add_argument('--hide-local-candidates', '-hd', default=False, help='Hide local Ice candidates', action='store_true')
options = parser.parse_args(sys.argv[1:])
ADDR_PORT = (options.addr, options.port)
KEEPALIVE_TIMEOUT = options.keepalive_timeout
logger = logging.getLogger('webrtc.signalling')
handler = logging.StreamHandler(sys.stderr)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.setLevel(logging.INFO)
logger.addHandler(handler)
############### Global data ###############
# Format: {uid: (Peer WebSocketServerProtocol,
# remote_address,
# [<'session'>,]}
peers = dict()
# Format: {caller_uid: [callee_uid,],
# callee_uid: [caller_uid,]}
# Bidirectional mapping between the two peers
sessions = defaultdict(list)
producers = set()
consumers = set()
listeners = set()
############### Helper functions ###############
async def recv_msg_ping(ws, raddr):
'''
Wait for a message forever, and send a regular ping to prevent bad routers
from closing the connection.
'''
msg = None
while msg is None:
try:
msg = await asyncio.wait_for(ws.recv(), KEEPALIVE_TIMEOUT)
except (asyncio.TimeoutError, concurrent.futures._base.TimeoutError):
logger.debug('Sending keepalive ping to {!r} in recv'.format(raddr))
await ws.ping()
return msg
async def cleanup_session(uid):
for other_id in sessions[uid]:
await peers[other_id][0].send('END_SESSION {}'.format(uid))
sessions[other_id].remove(uid)
logger.info("Cleaned up {} -> {} session".format(uid, other_id))
del sessions[uid]
async def end_session(uid, other_id):
if not other_id in sessions[uid]:
return
if not uid in sessions[other_id]:
return
if not other_id in peers:
return
await peers[other_id][0].send('END_SESSION {}'.format(uid))
sessions[uid].remove(other_id)
sessions[other_id].remove(uid)
async def remove_peer(uid):
await cleanup_session(uid)
if uid in peers:
ws, raddr, status = peers[uid]
del peers[uid]
await ws.close()
logger.info("Disconnected from peer {!r} at {!r}".format(uid, raddr))
if uid in producers:
for peer_id in listeners:
await peers[peer_id][0].send('REMOVE PRODUCER {}'.format(uid))
if uid in producers:
producers.remove(uid)
elif uid in consumers:
consumers.remove(uid)
elif uid in listeners:
listeners.remove(uid)
############### Handler functions ###############
async def connection_handler(ws, uid):
global peers, sessions
raddr = ws.remote_address
peers_status = []
peers[uid] = [ws, raddr, peers_status]
logger.info("Registered peer {!r} at {!r}".format(uid, raddr))
while True:
# Receive command, wait forever if necessary
msg = await recv_msg_ping(ws, raddr)
# Update current status
peers_status = peers[uid][2]
# Requested a session with a specific peer
if msg.startswith('START_SESSION '):
logger.info("{!r} command {!r}".format(uid, msg))
_, callee_id = msg.split(maxsplit=1)
if callee_id not in peers:
await ws.send('ERROR peer {!r} not found'.format(callee_id))
continue
wsc = peers[callee_id][0]
await wsc.send('START_SESSION {}'.format(uid))
logger.info('Session from {!r} ({!r}) to {!r} ({!r})'
''.format(uid, raddr, callee_id, wsc.remote_address))
# Register session
peers[uid][2] = peer_status = 'session'
sessions[uid].append(callee_id)
peers[callee_id][2] = 'session'
sessions[callee_id] .append(uid)
elif msg.startswith('END_SESSION '):
_, peer_id = msg.split(maxsplit=1)
await end_session(uid, peer_id)
elif msg.startswith('LIST'):
answer = ' '.join(['LIST'] + [other_id for other_id in peers if other_id in producers])
await ws.send(answer)
# We are in a session, messages must be relayed
else:
# We're in a session, route message to connected peer
msg = json.loads(msg)
if options.hide_local_candidates:
candidate = msg.get("ice", {}).get("candidate")
if candidate:
if candidate.split()[4].endswith(".local"):
logger.info(f"Ignoring local candidate: {candidate}")
continue
other_id = msg['peer-id']
try:
wso, oaddr, status = peers[other_id]
except KeyError:
continue
msg['peer-id'] = uid
msg = json.dumps(msg)
logger.debug("Got peer: {} -> {}: {}".format(uid, other_id, msg))
await wso.send(msg)
async def register_peer(ws):
'''
Register peer
'''
raddr = ws.remote_address
msg = await ws.recv()
cmd, typ = msg.split(maxsplit=1)
uid = str(uuid.uuid4())
while uid in peers:
uid = str(uuid.uuid4())
if cmd != 'REGISTER':
await ws.close(code=1002, reason='invalid protocol')
raise Exception("Invalid registration from {!r}".format(raddr))
if typ not in ('PRODUCER', 'CONSUMER', 'LISTENER'):
await ws.close(code=1002, reason='invalid protocol')
raise Exception("Invalid registration from {!r}".format(raddr))
# Send back a HELLO
await ws.send('REGISTERED {}'.format(uid))
return typ, uid
async def handler(ws, path):
'''
All incoming messages are handled here. @path is unused.
'''
raddr = ws.remote_address
logger.info("Connected to {!r}".format(raddr))
try:
typ, peer_id = await register_peer(ws)
except:
return
if typ == 'PRODUCER':
for other_id in listeners:
await peers[other_id][0].send('ADD PRODUCER {}'.format(peer_id))
producers.add(peer_id)
elif typ == 'CONSUMER':
consumers.add(peer_id)
elif typ == 'LISTENER':
listeners.add(peer_id)
try:
await connection_handler(ws, peer_id)
except websockets.ConnectionClosed:
logger.info("Connection to peer {!r} closed, exiting handler".format(raddr))
finally:
await remove_peer(peer_id)
if options.disable_ssl:
sslctx = None
else:
# Create an SSL context to be used by the websocket server
certpath = '.'
chain_pem = os.path.join(certpath, 'cert.pem')
key_pem = os.path.join(certpath, 'key.pem')
sslctx = ssl.create_default_context()
try:
sslctx.load_cert_chain(chain_pem, keyfile=key_pem)
except FileNotFoundError:
logger.error("Certificates not found, did you run generate_cert.sh?")
sys.exit(1)
# FIXME
sslctx.check_hostname = False
sslctx.verify_mode = ssl.CERT_NONE
logger.info("Listening on wss://{}:{}".format(*ADDR_PORT))
# Websocket server
wsd = websockets.serve(handler, *ADDR_PORT, ssl=sslctx,
# Maximum number of messages that websockets will pop
# off the asyncio and OS buffers per connection. See:
# https://websockets.readthedocs.io/en/stable/api.html#websockets.protocol.WebSocketCommonProtocol
max_queue=16)
ws_logger = logging.getLogger('websockets.server')
ws_logger.setLevel(logging.ERROR)
ws_logger.addHandler(logging.StreamHandler())
asyncio.get_event_loop().run_until_complete(wsd)
asyncio.get_event_loop().run_forever()

View file

@ -0,0 +1,101 @@
use async_std::task;
use clap::Parser;
use tracing_subscriber::prelude::*;
use webrtcsink_signalling::handlers::Handler;
use webrtcsink_signalling::server::Server;
use anyhow::Error;
use async_native_tls::TlsAcceptor;
use async_std::fs::File as AsyncFile;
use async_std::net::TcpListener;
use tracing::{info, warn};
#[derive(Parser, Debug)]
#[clap(about, version, author)]
/// Program arguments
struct Args {
/// Address to listen on
#[clap(short, long, default_value = "0.0.0.0")]
host: String,
/// Port to listen on
#[clap(short, long, default_value_t = 8443)]
port: u16,
/// TLS certificate to use
#[clap(short, long)]
cert: Option<String>,
/// password to TLS certificate
#[clap(long)]
cert_password: Option<String>,
}
fn initialize_logging(envvar_name: &str) -> Result<(), Error> {
tracing_log::LogTracer::init()?;
let env_filter = tracing_subscriber::EnvFilter::try_from_env(envvar_name)
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info"));
let fmt_layer = tracing_subscriber::fmt::layer()
.with_thread_ids(true)
.with_target(true)
.with_span_events(
tracing_subscriber::fmt::format::FmtSpan::NEW
| tracing_subscriber::fmt::format::FmtSpan::CLOSE,
);
let subscriber = tracing_subscriber::Registry::default()
.with(env_filter)
.with(fmt_layer);
tracing::subscriber::set_global_default(subscriber)?;
Ok(())
}
fn main() -> Result<(), Error> {
let args = Args::parse();
let server = Server::spawn(|stream| Handler::new(stream));
initialize_logging("WEBRTCSINK_SIGNALLING_SERVER_LOG")?;
task::block_on(async move {
let addr = format!("{}:{}", args.host, args.port);
// Create the event loop and TCP listener we'll accept connections on.
let listener = TcpListener::bind(&addr).await?;
let acceptor = match args.cert {
Some(cert) => {
let key = AsyncFile::open(cert).await?;
Some(TlsAcceptor::new(key, args.cert_password.as_deref().unwrap_or("")).await?)
}
None => None,
};
info!("Listening on: {}", addr);
while let Ok((stream, _)) = listener.accept().await {
let mut server_clone = server.clone();
let address = match stream.peer_addr() {
Ok(address) => address,
Err(err) => {
warn!("Connected peer with no address: {}", err);
continue;
}
};
info!("Accepting connection from {}", address);
if let Some(ref acceptor) = acceptor {
let stream = match acceptor.accept(stream).await {
Ok(stream) => stream,
Err(err) => {
warn!("Failed to accept TLS connection from {}: {}", address, err);
continue;
}
};
task::spawn(async move { server_clone.accept_async(stream).await });
} else {
task::spawn(async move { server_clone.accept_async(stream).await });
}
}
Ok(())
})
}

File diff suppressed because it is too large Load diff

2
signalling/src/lib.rs Normal file
View file

@ -0,0 +1,2 @@
pub mod handlers;
pub mod server;

View file

@ -0,0 +1,182 @@
use anyhow::Error;
use async_std::task;
use async_tungstenite::tungstenite::Message as WsMessage;
use futures::channel::mpsc;
use futures::prelude::*;
use futures::{AsyncRead, AsyncWrite};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use tracing::{info, instrument, trace, warn};
struct Peer {
receive_task_handle: task::JoinHandle<()>,
send_task_handle: task::JoinHandle<Result<(), Error>>,
sender: mpsc::Sender<String>,
}
struct State {
tx: Option<mpsc::Sender<(String, Option<String>)>>,
peers: HashMap<String, Peer>,
}
#[derive(Clone)]
pub struct Server {
state: Arc<Mutex<State>>,
}
#[derive(thiserror::Error, Debug)]
pub enum ServerError {
#[error("error during handshake {0}")]
Handshake(#[from] async_tungstenite::tungstenite::Error),
}
impl Server {
#[instrument(level = "debug", skip(factory))]
pub fn spawn<
I: for<'a> Deserialize<'a>,
O: Serialize + std::fmt::Debug,
Factory: FnOnce(Pin<Box<dyn Stream<Item = (String, Option<I>)> + Send>>) -> St,
St: Stream<Item = (String, O)>,
>(
factory: Factory,
) -> Self
where
O: Serialize + std::fmt::Debug,
St: Send + Unpin + 'static,
{
let (tx, rx) = mpsc::channel::<(String, Option<String>)>(1000);
let mut handler = factory(Box::pin(rx.filter_map(|(peer_id, msg)| async move {
if let Some(msg) = msg {
match serde_json::from_str::<I>(&msg) {
Ok(msg) => Some((peer_id, Some(msg))),
Err(err) => {
warn!("Failed to parse incoming message: {}", err);
None
}
}
} else {
Some((peer_id, None))
}
})));
let state = Arc::new(Mutex::new(State {
tx: Some(tx),
peers: HashMap::new(),
}));
let state_clone = state.clone();
let _ = task::spawn(async move {
while let Some((peer_id, msg)) = handler.next().await {
match serde_json::to_string(&msg) {
Ok(msg) => {
if let Some(peer) = state_clone.lock().unwrap().peers.get_mut(&peer_id) {
let mut sender = peer.sender.clone();
task::spawn(async move {
let _ = sender.send(msg).await;
});
}
}
Err(err) => {
warn!("Failed to serialize outgoing message: {}", err);
}
}
}
});
Self { state }
}
#[instrument(level = "debug", skip(state))]
fn remove_peer(state: Arc<Mutex<State>>, peer_id: &str) {
if let Some(mut peer) = state.lock().unwrap().peers.remove(peer_id) {
let peer_id = peer_id.to_string();
task::spawn(async move {
peer.sender.close_channel();
if let Err(err) = peer.send_task_handle.await {
trace!(peer_id = %peer_id, "Error while joining send task: {}", err);
}
peer.receive_task_handle.await;
});
}
}
#[instrument(level = "debug", skip(self, stream))]
pub async fn accept_async<S: 'static>(&mut self, stream: S) -> Result<String, ServerError>
where
S: AsyncRead + AsyncWrite + Unpin + Send,
{
let ws = match async_tungstenite::accept_async(stream).await {
Ok(ws) => ws,
Err(err) => {
warn!("Error during the websocket handshake: {}", err);
return Err(ServerError::Handshake(err));
}
};
let this_id = uuid::Uuid::new_v4().to_string();
info!(this_id = %this_id, "New WebSocket connection");
// 1000 is completely arbitrary, we simply don't want infinite piling
// up of messages as with unbounded
let (websocket_sender, mut websocket_receiver) = mpsc::channel::<String>(1000);
let this_id_clone = this_id.clone();
let (mut ws_sink, mut ws_stream) = ws.split();
let send_task_handle = task::spawn(async move {
while let Some(msg) = websocket_receiver.next().await {
trace!(this_id = %this_id_clone, "sending {}", msg);
ws_sink.send(WsMessage::Text(msg)).await?;
}
ws_sink.send(WsMessage::Close(None)).await?;
ws_sink.close().await?;
Ok::<(), Error>(())
});
let mut tx = self.state.lock().unwrap().tx.clone();
let this_id_clone = this_id.clone();
let state_clone = self.state.clone();
let receive_task_handle = task::spawn(async move {
while let Some(msg) = ws_stream.next().await {
match msg {
Ok(WsMessage::Text(msg)) => {
if let Some(tx) = tx.as_mut() {
if let Err(err) = tx.send((this_id_clone.clone(), Some(msg))).await {
warn!(this = %this_id_clone, "Error handling message: {:?}", err);
}
}
}
Ok(WsMessage::Close(reason)) => {
info!(this_id = %this_id_clone, "connection closed: {:?}", reason);
break;
}
Ok(_) => warn!(this_id = %this_id_clone, "Unsupported message type"),
Err(err) => {
warn!(this_id = %this_id_clone, "recv error: {}", err);
break;
}
}
}
if let Some(tx) = tx.as_mut() {
let _ = tx.send((this_id_clone.clone(), None)).await;
}
Self::remove_peer(state_clone, &this_id_clone);
});
self.state.lock().unwrap().peers.insert(
this_id.clone(),
Peer {
receive_task_handle,
send_task_handle,
sender: websocket_sender,
},
);
Ok(this_id)
}
}

View file

@ -111,7 +111,11 @@ function Session(our_id, peer_id, closed_callback) {
var thiz = this;
this.peer_connection.setLocalDescription(desc).then(() => {
this.setStatus("Sending SDP answer");
var sdp = {'peer-id': this.peer_id, 'sdp': this.peer_connection.localDescription.toJSON()}
var sdp = {
'type': 'peer',
'peerId': this.peer_id,
'sdp': this.peer_connection.localDescription.toJSON()
};
this.ws_conn.send(JSON.stringify(sdp));
}).catch(function(e) {
thiz.setError(e);
@ -146,26 +150,27 @@ function Session(our_id, peer_id, closed_callback) {
this.onServerMessage = function(event) {
console.log("Received " + event.data);
if (event.data.startsWith("REGISTERED")) {
try {
msg = JSON.parse(event.data);
} catch (e) {
if (e instanceof SyntaxError) {
this.handleIncomingError("Error parsing incoming JSON: " + event.data);
} else {
this.handleIncomingError("Unknown error parsing response: " + event.data);
}
return;
}
if (msg.type == "registered") {
this.setStatus("Registered with server");
this.connectPeer();
} else if (event.data.startsWith("ERROR")) {
this.handleIncomingError(event.data);
return;
} else {
// Handle incoming JSON SDP and ICE messages
try {
msg = JSON.parse(event.data);
} catch (e) {
if (e instanceof SyntaxError) {
this.handleIncomingError("Error parsing incoming JSON: " + event.data);
} else {
this.handleIncomingError("Unknown error parsing response: " + event.data);
}
return;
}
// Incoming JSON signals the beginning of a call
} else if (msg.type == "error") {
this.handleIncomingError(msg.details);
} else if (msg.type == "endSession") {
this.resetState();
this.closed_callback(this.our_id);
} else if (msg.type == "peer") {
// Incoming peer message signals the beginning of a call
if (!this.peer_connection)
this.createCall(msg);
@ -211,7 +216,10 @@ function Session(our_id, peer_id, closed_callback) {
this.ws_conn = new WebSocket(ws_url);
/* When connected, immediately register with the server */
this.ws_conn.addEventListener('open', (event) => {
this.ws_conn.send('REGISTER CONSUMER');
this.ws_conn.send(JSON.stringify({
"type": "register",
"peerType": "consumer"
}));
this.setStatus("Registering with server");
});
this.ws_conn.addEventListener('error', this.onServerError.bind(this));
@ -222,7 +230,10 @@ function Session(our_id, peer_id, closed_callback) {
this.connectPeer = function() {
this.setStatus("Connecting " + this.peer_id);
this.ws_conn.send("START_SESSION " + this.peer_id);
this.ws_conn.send(JSON.stringify({
"type": "startSession",
"peerId": this.peer_id
}));
};
this.onRemoteStreamAdded = function(event) {
@ -295,16 +306,16 @@ function Session(our_id, peer_id, closed_callback) {
}
}
if (!msg.sdp) {
console.log("WARNING: First message wasn't an SDP message!?");
}
this.peer_connection.onicecandidate = (event) => {
if (event.candidate == null) {
console.log("ICE Candidate was null, done");
return;
}
this.ws_conn.send(JSON.stringify({'ice': event.candidate.toJSON(), 'peer-id': this.peer_id}));
this.ws_conn.send(JSON.stringify({
"type": "peer",
"peerId": this.peer_id,
"ice": event.candidate.toJSON()
}));
};
this.setStatus("Created peer connection for call, waiting for SDP");
@ -354,23 +365,34 @@ function clearPeers() {
function onServerMessage(event) {
console.log("Received " + event.data);
if (event.data.startsWith("REGISTERED ")) {
console.log("Listener registered");
ws_conn.send('LIST');
} else if (event.data.startsWith('LIST')) {
var split = event.data.split(' ');
clearPeers();
for (var i = 1; i < split.length; i++) {
addPeer(split[i]);
try {
msg = JSON.parse(event.data);
} catch (e) {
if (e instanceof SyntaxError) {
this.handleIncomingError("Error parsing incoming JSON: " + event.data);
} else {
this.handleIncomingError("Unknown error parsing response: " + event.data);
}
} else if (event.data.startsWith('ADD PRODUCER')) {
var split = event.data.split(' ');
addPeer(split[2]);
} else if (event.data.startsWith('REMOVE PRODUCER')) {
var split = event.data.split(' ');
var li = document.getElementById("peer-" + split[2]);
return;
}
if (msg.type == "registered") {
ws_conn.send(JSON.stringify({
"type": "list"
}));
} else if (msg.type == "list") {
clearPeers();
for (i = 0; i < msg.producers.length; i++) {
addPeer(msg.producers[i]);
}
} else if (msg.type == "producerAdded") {
addPeer(msg.peerId);
} else if (msg.type == "producerRemoved") {
var li = document.getElementById("peer-" + msg.peerId);
li.parentNode.removeChild(li);
} else {
this.handleIncomingError("Unsupported message: ", msg);
}
};
@ -408,7 +430,10 @@ function connect() {
console.log("Connecting listener");
ws_conn = new WebSocket(ws_url);
ws_conn.addEventListener('open', (event) => {
ws_conn.send('REGISTER LISTENER');
ws_conn.send(JSON.stringify({
"type": "register",
"peerType": "listener"
}));
});
ws_conn.addEventListener('error', onServerError);
ws_conn.addEventListener('message', onServerMessage);