diff --git a/Cargo.lock b/Cargo.lock index 11a1b3e7..57e21720 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -13,9 +13,19 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.53" +version = "1.0.56" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "94a45b455c14666b85fc40a019e8ab9eb75e3a124e05494f5397122bc9eb06e0" +checksum = "4361135be9122e0870de935d7c439aef945b9f9ddd4199a553b5270b49c82a27" + +[[package]] +name = "async-attributes" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a3203e79f4dd9bdda415ed03cf14dae5a2bf775c683a00f94e9cd1faf0f596e5" +dependencies = [ + "quote", + "syn", +] [[package]] name = "async-channel" @@ -44,9 +54,9 @@ dependencies = [ [[package]] name = "async-global-executor" -version = "2.0.2" +version = "2.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9586ec52317f36de58453159d48351bc244bc24ced3effc1fce22f3d48664af6" +checksum = "c026b7e44f1316b567ee750fea85103f87fcb80792b860e979f221259796ca0a" dependencies = [ "async-channel", "async-executor", @@ -79,9 +89,9 @@ dependencies = [ [[package]] name = "async-lock" -version = "2.4.0" +version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e6a8ea61bf9947a1007c5cada31e647dbc77b103c679858150003ba697ea798b" +checksum = "e97a171d191782fba31bb902b14ad94e24a68145032b7eedf871ab0bc0d077b6" dependencies = [ "event-listener", ] @@ -107,6 +117,18 @@ dependencies = [ "url", ] +[[package]] +name = "async-native-tls" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d57d4cec3c647232e1094dc013546c0b33ce785d8aeb251e1f20dfaf8a9a13fe" +dependencies = [ + "futures-util", + "native-tls", + "thiserror", + "url", +] + [[package]] name = "async-process" version = "1.3.0" @@ -130,6 +152,7 @@ version = "1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f8056f1455169ab86dd47b47391e4ab0cbd25410a70e9fe675544f49bafaf952" dependencies = [ + "async-attributes", "async-channel", "async-global-executor", "async-io", @@ -154,17 +177,17 @@ dependencies = [ [[package]] name = "async-task" -version = "4.0.3" +version = "4.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e91831deabf0d6d7ec49552e489aed63b7456a7a3c46cff62adad428110b0af0" +checksum = "30696a84d817107fc028e049980e09d5e140e8da8f1caeb17e8e950658a3cea9" [[package]] name = "async-tungstenite" -version = "0.16.1" +version = "0.17.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5682ea0913e5c20780fe5785abacb85a411e7437bf52a1bedb93ddb3972cb8dd" +checksum = "7922abeade7dd8948c20dfa1f85dc48cc952d2e0791f7c42b8b1cbb07a57129d" dependencies = [ - "async-native-tls", + "async-native-tls 0.4.0", "async-std", "futures-io", "futures-util", @@ -192,9 +215,9 @@ dependencies = [ [[package]] name = "autocfg" -version = "1.0.1" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a" +checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" [[package]] name = "base64" @@ -210,18 +233,18 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" [[package]] name = "block-buffer" -version = "0.9.0" +version = "0.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4152116fd6e9dadb291ae18fc1ec3575ed6d84c29642d97890f4b4a3417297e4" +checksum = "0bf7fe51849ea569fd452f37822f606a5cabb684dc918707a0193fd4664ff324" dependencies = [ "generic-array", ] [[package]] name = "blocking" -version = "1.1.0" +version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "046e47d4b2d391b1f6f8b407b1deb8dee56c1852ccd868becf2710f601b5f427" +checksum = "c6ccb65d468978a086b69884437ded69a90faab3bbe6e67f242173ea728acccc" dependencies = [ "async-channel", "async-task", @@ -257,15 +280,15 @@ checksum = "c1db59621ec70f09c5e9b597b220c7a2b43611f4710dc03ceb8748637775692c" [[package]] name = "cc" -version = "1.0.72" +version = "1.0.73" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "22a9137b95ea06864e018375b72adfb7db6e6f68cfc8df5a04d00288050485ee" +checksum = "2fff2a6927b3bb87f9595d67196a70493f627687a71d87a0d692242c33f58c11" [[package]] name = "cfg-expr" -version = "0.9.0" +version = "0.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "edae0b9625d1fce32f7d64b71784d9b1bf8469ec1a9c417e44aaf16a9cbd7571" +checksum = "5e068cb2806bbc15b439846dc16c5f89f8599f2c3e4d73d4449d38f9b2f0b6c5" dependencies = [ "smallvec", ] @@ -290,9 +313,9 @@ dependencies = [ [[package]] name = "clap" -version = "3.0.12" +version = "3.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2afefa54b5c7dd40918dc1e09f213a171ab5937aadccab45e804780b238f9f43" +checksum = "d8c93436c21e4698bacadf42917db28b23017027a4deccb35dbe47a7e7840123" dependencies = [ "atty", "bitflags", @@ -307,11 +330,11 @@ dependencies = [ [[package]] name = "clap_derive" -version = "3.0.12" +version = "3.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0fd2078197a22f338bd4fbf7d6387eb6f0d6a3c69e6cbc09f5c93e97321fd92a" +checksum = "da95d038ede1a964ce99f49cbe27a7fb538d1da595e4b4f70b8c8f338d17bf16" dependencies = [ - "heck 0.4.0", + "heck", "proc-macro-error", "proc-macro2", "quote", @@ -329,9 +352,9 @@ dependencies = [ [[package]] name = "core-foundation" -version = "0.9.2" +version = "0.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6888e10551bb93e424d8df1d07f1a8b4fceb0001a3a4b048bfc47554946f47b3" +checksum = "194a7a9e6de53fa55116934067c844d9d749312f75c6f6d0980e8c252f8c2146" dependencies = [ "core-foundation-sys", "libc", @@ -354,14 +377,24 @@ dependencies = [ [[package]] name = "crossbeam-utils" -version = "0.8.6" +version = "0.8.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cfcae03edb34f947e64acdb1c33ec169824e20657e9ecb61cef6c8c74dcb8120" +checksum = "0bf124c720b7686e3c2663cf54062ab0f68a88af2fb6a030e87e30bf721fcb38" dependencies = [ "cfg-if", "lazy_static", ] +[[package]] +name = "crypto-common" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57952ca27b5e3606ff4dd79b0020231aaf9d6aa76dc05fd30137538c50bd3ce8" +dependencies = [ + "generic-array", + "typenum", +] + [[package]] name = "ctor" version = "0.1.21" @@ -374,11 +407,12 @@ dependencies = [ [[package]] name = "digest" -version = "0.9.0" +version = "0.10.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d3dd60d1080a57a05ab032377049e0591415d2b31afd7028356dbf3cc6dcb066" +checksum = "f2fb860ca6fafa5552fb6d0e816a69c8e49f0908bf524e30a90d97c85892d506" dependencies = [ - "generic-array", + "block-buffer", + "crypto-common", ] [[package]] @@ -427,17 +461,11 @@ dependencies = [ "percent-encoding", ] -[[package]] -name = "fragile" -version = "1.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8da1b8f89c5b5a5b7e59405cfcf0bb9588e5ed19f0b57a4cd542bbba3f164a6d" - [[package]] name = "futures" -version = "0.3.19" +version = "0.3.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "28560757fe2bb34e79f907794bb6b22ae8b0e5c669b638a1132f2592b19035b4" +checksum = "f73fe65f54d1e12b726f517d3e2135ca3125a437b6d998caf1962961f7172d9e" dependencies = [ "futures-channel", "futures-core", @@ -450,9 +478,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.19" +version = "0.3.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ba3dda0b6588335f360afc675d0564c17a77a2bda81ca178a4b6081bd86c7f0b" +checksum = "c3083ce4b914124575708913bca19bfe887522d6e2e6d0952943f5eac4a74010" dependencies = [ "futures-core", "futures-sink", @@ -460,15 +488,15 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.19" +version = "0.3.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d0c8ff0461b82559810cdccfde3215c3f373807f5e5232b71479bff7bb2583d7" +checksum = "0c09fd04b7e4073ac7156a9539b57a484a8ea920f79c7c675d05d289ab6110d3" [[package]] name = "futures-executor" -version = "0.3.19" +version = "0.3.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "29d6d2ff5bb10fb95c85b8ce46538a2e5f5e7fdc755623a7d4529ab8a4ed9d2a" +checksum = "9420b90cfa29e327d0429f19be13e7ddb68fa1cccb09d65e5706b8c7a749b8a6" dependencies = [ "futures-core", "futures-task", @@ -477,9 +505,9 @@ dependencies = [ [[package]] name = "futures-io" -version = "0.3.19" +version = "0.3.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1f9d34af5a1aac6fb380f735fe510746c38067c5bf16c7fd250280503c971b2" +checksum = "fc4045962a5a5e935ee2fdedaa4e08284547402885ab326734432bed5d12966b" [[package]] name = "futures-lite" @@ -498,9 +526,9 @@ dependencies = [ [[package]] name = "futures-macro" -version = "0.3.19" +version = "0.3.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6dbd947adfffb0efc70599b3ddcf7b5597bb5fa9e245eb99f62b3a5f7bb8bd3c" +checksum = "33c1e13800337f4d4d7a316bf45a567dbcb6ffe087f16424852d97e97a91f512" dependencies = [ "proc-macro2", "quote", @@ -509,21 +537,21 @@ dependencies = [ [[package]] name = "futures-sink" -version = "0.3.19" +version = "0.3.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e3055baccb68d74ff6480350f8d6eb8fcfa3aa11bdc1a1ae3afdd0514617d508" +checksum = "21163e139fa306126e6eedaf49ecdb4588f939600f0b1e770f4205ee4b7fa868" [[package]] name = "futures-task" -version = "0.3.19" +version = "0.3.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6ee7c6485c30167ce4dfb83ac568a849fe53274c831081476ee13e0dce1aad72" +checksum = "57c66a976bf5909d801bbef33416c41372779507e7a6b3a5e25e4749c58f776a" [[package]] name = "futures-util" -version = "0.3.19" +version = "0.3.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d9b5cf40b47a271f77a8b1bec03ca09044d99d2372c0de244e66430761127164" +checksum = "d8b7abd5d659d9b90c8cba917f6ec750a74e2dc23902ef9cd4cc8c8b22e6036a" dependencies = [ "futures-channel", "futures-core", @@ -549,9 +577,9 @@ dependencies = [ [[package]] name = "getrandom" -version = "0.2.4" +version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "418d37c8b1d42553c93648be529cb70f920d3baf8ef469b74b9638df426e0b4c" +checksum = "d39cd93900197114fa1fcb7ae84ca742095eed9442088988ae74fa744e930e77" dependencies = [ "cfg-if", "libc", @@ -560,9 +588,9 @@ dependencies = [ [[package]] name = "glib" -version = "0.15.3" +version = "0.15.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4a703581e2538fe699c5476cf26b456d694c5272b6e999d3ab47711c5eaa2dd2" +checksum = "89528258cfdc79b1e54591202705be67896ad254f99e3cc2ea3710e0307148f2" dependencies = [ "bitflags", "futures-channel", @@ -580,12 +608,12 @@ dependencies = [ [[package]] name = "glib-macros" -version = "0.15.3" +version = "0.15.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e58b262ff65ef771003873cea8c10e0fe854f1c508d48d62a4111a1ff163f7d1" +checksum = "41bfd8d227dead0829ac142454e97531b93f576d0805d779c42bfd799c65c572" dependencies = [ "anyhow", - "heck 0.4.0", + "heck", "proc-macro-crate", "proc-macro-error", "proc-macro2", @@ -595,9 +623,9 @@ dependencies = [ [[package]] name = "glib-sys" -version = "0.15.1" +version = "0.15.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c668102c6e15e0a7f6b99b59f602c2e806967bb86414f617b77e19b1de5b3fac" +checksum = "19289e4953dad38c9fea1c5c52cc594f29afc4a70802822ef382dca356b27bfd" dependencies = [ "libc", "system-deps", @@ -605,22 +633,21 @@ dependencies = [ [[package]] name = "gloo-timers" -version = "0.2.2" +version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6f16c88aa13d2656ef20d1c042086b8767bbe2bdb62526894275a1b062161b2e" +checksum = "4d12a7f4e95cfe710f1d624fb1210b7d961a5fb05c4fd942f4feab06e61f590e" dependencies = [ "futures-channel", "futures-core", "js-sys", "wasm-bindgen", - "web-sys", ] [[package]] name = "gobject-sys" -version = "0.15.1" +version = "0.15.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6edb1f0b3e4c08e2a0a490d1082ba9e902cdff8ff07091e85c6caec60d17e2ab" +checksum = "a085ec9acece647f905b675705c349eb00acba30505f5cee43459bc3b2e968cc" dependencies = [ "glib-sys", "libc", @@ -638,13 +665,12 @@ dependencies = [ [[package]] name = "gstreamer" -version = "0.18.2" +version = "0.18.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9bc8364f9129c73e214ac1128fcff0e9a25cbc69f06d3ede9f7b07c8b6338680" +checksum = "5c1545fac08d7a28f8707101298cbf99d1bc72529698ff2d1fec87cc30a3fb9a" dependencies = [ "bitflags", "cfg-if", - "fragile", "futures-channel", "futures-core", "futures-util", @@ -722,22 +748,23 @@ dependencies = [ [[package]] name = "gstreamer-rtp" -version = "0.18.0" +version = "0.18.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f43fb74e6cb6987651680767dc907017b1bb70518092336679fe9c3050ef1fb4" +checksum = "3d1a7fca987a2f01b555df2556ce7b64af546cb4a0c4376c84a816924278ae06" dependencies = [ "bitflags", "glib", "gstreamer", "gstreamer-rtp-sys", + "libc", "once_cell", ] [[package]] name = "gstreamer-rtp-sys" -version = "0.18.0" +version = "0.18.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f859fa6194c2b348514f861dfacb7a153888bf0898a748797ce58cbab8fde0c7" +checksum = "2afde26d03b4146a39d07f576082f97b3775e7884ef3ccef5c4b7c0917e17469" dependencies = [ "glib-sys", "gstreamer-base-sys", @@ -783,13 +810,12 @@ dependencies = [ [[package]] name = "gstreamer-video" -version = "0.18.1" +version = "0.18.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "410c72d885a67aeb7dbfa49c347e6c85d60f54e1cdaf6aadf8b5364892451261" +checksum = "bfbeef259fe286d6271402160daf7692b6bc56d52b2a9437dead797f3f60c222" dependencies = [ "bitflags", "cfg-if", - "fragile", "futures-channel", "glib", "gstreamer", @@ -802,9 +828,9 @@ dependencies = [ [[package]] name = "gstreamer-video-sys" -version = "0.18.0" +version = "0.18.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "255c487bf6dd145e23558eaf1c92ef0946ee1999d22bdadc1e492b463609c4b6" +checksum = "33331b1675e73b5b000c796354278eca7fdde9327015971d9f41afe28b96e0dc" dependencies = [ "glib-sys", "gobject-sys", @@ -816,9 +842,9 @@ dependencies = [ [[package]] name = "gstreamer-webrtc" -version = "0.18.0" +version = "0.18.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "83edc69fc8bcc242a6504b3974fbc26c2c9be20df39e67b13e808ce5d872f3e8" +checksum = "66be30e56113bcdf5acc151bd098fae4916a41f57dc99b570e14e4112111b723" dependencies = [ "glib", "gstreamer", @@ -829,9 +855,9 @@ dependencies = [ [[package]] name = "gstreamer-webrtc-sys" -version = "0.18.0" +version = "0.18.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "96ae4a4d636e74ef024f726a492ba3840102e9066b41cf4085b9d6b5e6efd030" +checksum = "1ea5f3bab2859f0b279edab6ea2620700259fd1bf91cf82185fe10a0fc5e96cc" dependencies = [ "glib-sys", "gstreamer-sdp-sys", @@ -845,15 +871,6 @@ version = "0.11.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ab5ef0d4909ef3724cc8cce6ccc8572c5c817592e9285f5464f8e86f8bd3726e" -[[package]] -name = "heck" -version = "0.3.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6d621efb26863f0e9924c6ac577e8275e5e6b77455db64ffa6c65c904e9e132c" -dependencies = [ - "unicode-segmentation", -] - [[package]] name = "heck" version = "0.4.0" @@ -882,9 +899,9 @@ dependencies = [ [[package]] name = "httparse" -version = "1.5.1" +version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "acd94fdbe1d4ff688b67b04eee2e17bd50995534a61539e45adfefb45e5e5503" +checksum = "9100414882e15fb7feccb4897e5f0ff0ff1ca7d1a86a23208ada4d7a18e6c6c4" [[package]] name = "idna" @@ -948,9 +965,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" [[package]] name = "libc" -version = "0.2.114" +version = "0.2.120" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b0005d08a8f7b65fb8073cb697aa0b12b631ed251ce73d862ce50eeb52ce3b50" +checksum = "ad5c14e80759d0939d013e6ca49930e59fc53dd8e5009132f76240c179380c09" [[package]] name = "log" @@ -1050,15 +1067,9 @@ dependencies = [ [[package]] name = "once_cell" -version = "1.9.0" +version = "1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da32515d9f6e6e489d7bc9d84c71b060db7247dc035bbe44eac88cf87486d8d5" - -[[package]] -name = "opaque-debug" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5" +checksum = "87f3e037eac156d1775da914196f0f37741a274155e34a0b7e427c35d2a2ecb9" [[package]] name = "openssl" @@ -1174,9 +1185,9 @@ checksum = "bc5c99d529f0d30937f6f4b8a86d988047327bb88d04d2c4afc356de74722131" [[package]] name = "proc-macro-crate" -version = "1.1.0" +version = "1.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1ebace6889caf889b4d3f76becee12e90353f2b8c7d875534a71e5742f8f6f83" +checksum = "e17d47ce914bf4de440332250b0edd23ce48c005f59fab39d3335866b114f11a" dependencies = [ "thiserror", "toml", @@ -1226,14 +1237,13 @@ dependencies = [ [[package]] name = "rand" -version = "0.8.4" +version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2e7573632e6454cf6b99d7aac4ccca54be06da05aca2ef7423d22d27d4d4bcd8" +checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" dependencies = [ "libc", "rand_chacha", "rand_core", - "rand_hc", ] [[package]] @@ -1255,29 +1265,20 @@ dependencies = [ "getrandom", ] -[[package]] -name = "rand_hc" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d51e9f596de227fda2ea6c84607f5558e196eeaf43c986b724ba4fb8fdf497e7" -dependencies = [ - "rand_core", -] - [[package]] name = "redox_syscall" -version = "0.2.10" +version = "0.2.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8383f39639269cde97d255a32bdb68c047337295414940c68bdd30c2e13203ff" +checksum = "8380fe0152551244f0747b1bf41737e0f8a74f97a14ccefd1148187271634f3c" dependencies = [ "bitflags", ] [[package]] name = "regex" -version = "1.5.4" +version = "1.5.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d07a8629359eb56f1e2fb1652bb04212c072a87ba68546a04065d525673ac461" +checksum = "1a11647b6b25ff05a515cb92c365cec08801e83423a235b51e231e1808747286" dependencies = [ "regex-syntax", ] @@ -1324,9 +1325,9 @@ dependencies = [ [[package]] name = "security-framework" -version = "2.5.0" +version = "2.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d09d3c15d814eda1d6a836f2f2b56a6abc1446c8a34351cb3180d3db92ffe4ce" +checksum = "2dc14f172faf8a0194a3aded622712b0de276821addc574fa54fc0a1167e10dc" dependencies = [ "bitflags", "core-foundation", @@ -1337,9 +1338,9 @@ dependencies = [ [[package]] name = "security-framework-sys" -version = "2.5.0" +version = "2.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e90dd10c41c6bfc633da6e0c659bd25d31e0791e5974ac42970267d59eba87f7" +checksum = "0160a13a177a45bfb43ce71c01580998474f556ad854dcbca936dd2841a5c556" dependencies = [ "core-foundation-sys", "libc", @@ -1347,9 +1348,9 @@ dependencies = [ [[package]] name = "serde" -version = "1.0.135" +version = "1.0.136" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2cf9235533494ea2ddcdb794665461814781c53f19d87b76e571a1c35acbad2b" +checksum = "ce31e24b01e1e524df96f1c2fdd054405f8d7376249a5110886fb4b658484789" dependencies = [ "serde_derive", ] @@ -1365,9 +1366,9 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.135" +version = "1.0.136" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8dcde03d87d4c973c04be249e7d8f0b35db1c848c487bd43032808e59dd8328d" +checksum = "08597e7152fcd306f41838ed3e37be9eaeed2b61c42e2117266a554fab4662f9" dependencies = [ "proc-macro2", "quote", @@ -1376,9 +1377,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.78" +version = "1.0.79" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d23c1ba4cf0efd44be32017709280b32d1cea5c3f1275c3b6d9e8bc54f758085" +checksum = "8e8d9fa5c3b304765ce1fd9c4c8a3de2c8db365a5b91be52f186efc675681d95" dependencies = [ "itoa", "ryu", @@ -1387,15 +1388,13 @@ dependencies = [ [[package]] name = "sha-1" -version = "0.9.8" +version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "99cd6713db3cf16b6c84e06321e049a9b9f699826e16096d23bbcc44d15d51a6" +checksum = "028f48d513f9678cda28f6e4064755b3fbb2af6acd672f2c209b62323f7aea0f" dependencies = [ - "block-buffer", "cfg-if", "cpufeatures", "digest", - "opaque-debug", ] [[package]] @@ -1440,9 +1439,9 @@ checksum = "f2dd574626839106c320a323308629dcb1acfc96e32a8cba364ddc61ac23ee83" [[package]] name = "socket2" -version = "0.4.3" +version = "0.4.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f82496b90c36d70af5fcd482edaa2e0bd16fade569de1330405fecbbdac736b" +checksum = "66d72b759436ae32898a2af0a14218dbf55efde3feeb170eb623637db85ee1e0" dependencies = [ "libc", "winapi", @@ -1456,9 +1455,9 @@ checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" [[package]] name = "syn" -version = "1.0.86" +version = "1.0.89" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a65b3f4ffa0092e9887669db0eae07941f023991ab58ea44da8fe8e2d511c6b" +checksum = "ea297be220d52398dcc07ce15a209fce436d361735ac1db700cab3b6cdfb9f54" dependencies = [ "proc-macro2", "quote", @@ -1467,12 +1466,12 @@ dependencies = [ [[package]] name = "system-deps" -version = "6.0.0" +version = "6.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b1487aaddaacbc5d60a2a507ba1617c5ca66c57dd0dd07d0c5efd5b693841d4" +checksum = "a1a45a1c4c9015217e12347f2a411b57ce2c4fc543913b14b6fe40483328e709" dependencies = [ "cfg-expr", - "heck 0.3.3", + "heck", "pkg-config", "toml", "version-compare", @@ -1494,18 +1493,29 @@ dependencies = [ [[package]] name = "termcolor" -version = "1.1.2" +version = "1.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2dfed899f0eb03f32ee8c6a0aabdb8a7949659e3466561fc0adf54e26d88c5f4" +checksum = "bab24d30b911b2376f3a13cc2cd443142f0c81dda04c118693e35b3835757755" dependencies = [ "winapi-util", ] [[package]] -name = "textwrap" -version = "0.14.2" +name = "test-log" +version = "0.2.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0066c8d12af8b5acd21e00547c3797fde4e8677254a7ee429176ccebbe93dd80" +checksum = "eb78caec569a40f42c078c798c0e35b922d9054ec28e166f0d6ac447563d91a4" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "textwrap" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1141d4d61095b28419e22cb0bbf02755f5e54e0526f97f1e3d1d160e60885fb" [[package]] name = "thiserror" @@ -1562,9 +1572,9 @@ dependencies = [ [[package]] name = "tracing" -version = "0.1.29" +version = "0.1.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "375a639232caf30edfc78e8d89b2d4c375515393e7af7e16f01cd96917fb2105" +checksum = "4a1bdf54a7c28a2bbf701e1d2233f6c77f473486b94bee4f9678da5a148dca7f" dependencies = [ "cfg-if", "log", @@ -1575,9 +1585,9 @@ dependencies = [ [[package]] name = "tracing-attributes" -version = "0.1.18" +version = "0.1.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f4f480b8f81512e825f337ad51e94c1eb5d3bbdf2b363dcd01e2b19a9ffe3f8e" +checksum = "2e65ce065b4b5c53e73bb28912318cb8c9e9ad3921f1d669eb0e68b4c8143a2b" dependencies = [ "proc-macro2", "quote", @@ -1586,11 +1596,12 @@ dependencies = [ [[package]] name = "tracing-core" -version = "0.1.21" +version = "0.1.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1f4ed65637b8390770814083d20756f87bfa2c21bf2f110babdc5438351746e4" +checksum = "aa31669fa42c09c34d94d8165dd2012e8ff3c66aca50f3bb226b68f216f2706c" dependencies = [ "lazy_static", + "valuable", ] [[package]] @@ -1606,9 +1617,9 @@ dependencies = [ [[package]] name = "tracing-subscriber" -version = "0.3.6" +version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "77be66445c4eeebb934a7340f227bfe7b338173d3f8c00a60a5a58005c9faecf" +checksum = "9e0ab7bdc962035a87fba73f3acca9b8a8d0034c2e6f60b84aeaaddddc155dce" dependencies = [ "ansi_term", "lazy_static", @@ -1624,9 +1635,9 @@ dependencies = [ [[package]] name = "tungstenite" -version = "0.16.0" +version = "0.17.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6ad3713a14ae247f22a728a0456a545df14acf3867f905adff84be99e23b3ad1" +checksum = "d96a2dea40e7570482f28eb57afbe42d97551905da6a9400acc5c328d24004f5" dependencies = [ "base64", "byteorder", @@ -1663,12 +1674,6 @@ dependencies = [ "tinyvec", ] -[[package]] -name = "unicode-segmentation" -version = "1.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8895849a949e7845e06bd6dc1aa51731a103c42707010a5b591c0038fb73385b" - [[package]] name = "unicode-xid" version = "0.2.2" @@ -1702,6 +1707,12 @@ dependencies = [ "getrandom", ] +[[package]] +name = "valuable" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" + [[package]] name = "value-bag" version = "1.0.0-alpha.8" @@ -1837,7 +1848,6 @@ dependencies = [ "gstreamer-webrtc", "once_cell", "serde", - "serde_derive", "serde_json", "smallvec", "thiserror", @@ -1845,6 +1855,37 @@ dependencies = [ "tracing-log", "tracing-subscriber", "uuid", + "webrtcsink-protocol", +] + +[[package]] +name = "webrtcsink-protocol" +version = "0.1.0" +dependencies = [ + "serde", + "serde_json", +] + +[[package]] +name = "webrtcsink-signalling" +version = "0.1.0" +dependencies = [ + "anyhow", + "async-native-tls 0.3.3", + "async-std", + "async-tungstenite", + "clap", + "futures", + "pin-project-lite", + "serde", + "serde_json", + "test-log", + "thiserror", + "tracing", + "tracing-log", + "tracing-subscriber", + "uuid", + "webrtcsink-protocol", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 01458b90..b8454061 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,4 +2,6 @@ members = [ "plugins", + "protocol", + "signalling", ] diff --git a/README.md b/README.md index 05f87bb8..2973f810 100644 --- a/README.md +++ b/README.md @@ -15,9 +15,9 @@ useful alternative. `webrtcsink` implements the following features: -* Built-in signaller: when using the default signalling server (provided as a python - script [here](signalling/simple-server.py)), this element will perform signalling without - requiring application interaction. This makes it usable directly from `gst-launch`. +* Built-in signaller: when using the default signalling server, this element will + perform signalling without requiring application interaction. + This makes it usable directly from `gst-launch`. * Application-provided signalling: `webrtcsink` can be instantiated by an application with a custom signaller. That signaller must be a GObject, and must implement the @@ -90,8 +90,7 @@ cargo build Open three terminals. In the first, run: ``` shell -cd signalling -python3 simple-server.py --addr=127.0.0.1 --disable-ssl +WEBRTCSINK_SIGNALLING_SERVER_LOG=debug cargo run --bin server ``` In the second, run: @@ -109,7 +108,7 @@ gst-launch-1.0 webrtcsink name=ws videotestsrc ! ws. audiotestsrc ! ws. ``` When the pipeline above is running succesfully, open a browser and -point it to the python server: +point it to the http server: ``` shell xdg-open http://127.0.0.1:8000 diff --git a/plugins/Cargo.toml b/plugins/Cargo.toml index 015c4e12..5cf7176b 100644 --- a/plugins/Cargo.toml +++ b/plugins/Cargo.toml @@ -21,11 +21,11 @@ anyhow = "1" thiserror = "1" futures = "0.3" async-std = { version = "1", features = ["unstable"] } -async-tungstenite = { version = "0.16", features = ["async-std-runtime", "async-native-tls"] } +async-tungstenite = { version = "0.17", features = ["async-std-runtime", "async-native-tls"] } serde = "1" -serde_derive = "1" serde_json = "1" fastrand = "1.0" +webrtcsink-protocol = { version = "0.1", path="../protocol" } [dev-dependencies] tracing = { version = "0.1", features = ["log"] } diff --git a/plugins/src/signaller/imp.rs b/plugins/src/signaller/imp.rs index 6eafd3cd..b0055e98 100644 --- a/plugins/src/signaller/imp.rs +++ b/plugins/src/signaller/imp.rs @@ -9,8 +9,8 @@ use gst::glib::prelude::*; use gst::subclass::prelude::*; use gst::{gst_debug, gst_error, gst_info, gst_trace, gst_warning}; use once_cell::sync::Lazy; -use serde_derive::{Deserialize, Serialize}; use std::sync::Mutex; +use webrtcsink_protocol as p; static CAT: Lazy = Lazy::new(|| { gst::DebugCategory::new( @@ -23,39 +23,11 @@ static CAT: Lazy = Lazy::new(|| { #[derive(Default)] struct State { /// Sender for the websocket messages - websocket_sender: Option>, + websocket_sender: Option>, send_task_handle: Option>>, receive_task_handle: Option>, } -#[derive(Serialize, Deserialize)] -#[serde(tag = "type")] -#[serde(rename_all = "lowercase")] -enum SdpMessage { - Offer { sdp: String }, - Answer { sdp: String }, -} - -#[derive(Serialize, Deserialize)] -#[serde(rename_all = "lowercase")] -enum JsonMsgInner { - Ice { - candidate: String, - #[serde(rename = "sdpMLineIndex")] - sdp_mline_index: u32, - }, - Sdp(SdpMessage), -} - -#[derive(Serialize, Deserialize)] -#[serde(rename_all = "lowercase")] -struct JsonMsg { - #[serde(rename = "peer-id")] - peer_id: String, - #[serde(flatten)] - inner: JsonMsgInner, -} - struct Settings { address: Option, } @@ -92,20 +64,19 @@ impl Signaller { // Channel for asynchronously sending out websocket message let (mut ws_sink, mut ws_stream) = ws.split(); - ws_sink - .send(WsMessage::Text("REGISTER PRODUCER".to_string())) - .await?; - // 1000 is completely arbitrary, we simply don't want infinite piling // up of messages as with unbounded - let (websocket_sender, mut websocket_receiver) = mpsc::channel::(1000); + let (mut websocket_sender, mut websocket_receiver) = + mpsc::channel::(1000); let element_clone = element.downgrade(); let send_task_handle = task::spawn(async move { while let Some(msg) = websocket_receiver.next().await { if let Some(element) = element_clone.upgrade() { gst_trace!(CAT, obj: &element, "Sending websocket message {:?}", msg); } - ws_sink.send(msg).await?; + ws_sink + .send(WsMessage::Text(serde_json::to_string(&msg).unwrap())) + .await?; } if let Some(element) = element_clone.upgrade() { @@ -118,6 +89,10 @@ impl Signaller { Ok::<(), Error>(()) }); + websocket_sender + .send(p::IncomingMessage::Register(p::RegisterMessage::Producer)) + .await?; + let element_clone = element.downgrade(); let receive_task_handle = task::spawn(async move { while let Some(msg) = async_std::stream::StreamExt::next(&mut ws_stream).await { @@ -126,50 +101,78 @@ impl Signaller { Ok(WsMessage::Text(msg)) => { gst_trace!(CAT, obj: &element, "Received message {}", msg); - if msg.starts_with("REGISTERED ") { - gst_info!(CAT, obj: &element, "We are registered with the server"); - } else if let Some(peer_id) = msg.strip_prefix("START_SESSION ") { - if let Err(err) = element.add_consumer(peer_id) { - gst_warning!(CAT, obj: &element, "{}", err); - } - } else if let Some(peer_id) = msg.strip_prefix("END_SESSION ") { - if let Err(err) = element.remove_consumer(peer_id) { - gst_warning!(CAT, obj: &element, "{}", err); - } - } else if let Ok(msg) = serde_json::from_str::(&msg) { - match msg.inner { - JsonMsgInner::Sdp(SdpMessage::Answer { sdp }) => { - if let Err(err) = element.handle_sdp( - &msg.peer_id, - &gst_webrtc::WebRTCSessionDescription::new( - gst_webrtc::WebRTCSDPType::Answer, - gst_sdp::SDPMessage::parse_buffer(sdp.as_bytes()) - .unwrap(), - ), - ) { + if let Ok(msg) = serde_json::from_str::(&msg) { + match msg { + p::OutgoingMessage::Registered( + p::RegisteredMessage::Producer { peer_id }, + ) => { + gst_info!( + CAT, + obj: &element, + "We are registered with the server, our peer id is {}", + peer_id + ); + } + p::OutgoingMessage::Registered(_) => unreachable!(), + p::OutgoingMessage::StartSession { peer_id } => { + if let Err(err) = element.add_consumer(&peer_id) { gst_warning!(CAT, obj: &element, "{}", err); } } - JsonMsgInner::Sdp(SdpMessage::Offer { .. }) => { + p::OutgoingMessage::EndSession { peer_id } => { + if let Err(err) = element.remove_consumer(&peer_id) { + gst_warning!(CAT, obj: &element, "{}", err); + } + } + p::OutgoingMessage::Peer(p::PeerMessage { + peer_id, + peer_message, + }) => match peer_message { + p::PeerMessageInner::Sdp(p::SdpMessage::Answer { sdp }) => { + if let Err(err) = element.handle_sdp( + &peer_id, + &gst_webrtc::WebRTCSessionDescription::new( + gst_webrtc::WebRTCSDPType::Answer, + gst_sdp::SDPMessage::parse_buffer( + sdp.as_bytes(), + ) + .unwrap(), + ), + ) { + gst_warning!(CAT, obj: &element, "{}", err); + } + } + p::PeerMessageInner::Sdp(p::SdpMessage::Offer { + .. + }) => { + gst_warning!( + CAT, + obj: &element, + "Ignoring offer from peer" + ); + } + p::PeerMessageInner::Ice { + candidate, + sdp_m_line_index, + } => { + if let Err(err) = element.handle_ice( + &peer_id, + Some(sdp_m_line_index), + None, + &candidate, + ) { + gst_warning!(CAT, obj: &element, "{}", err); + } + } + }, + _ => { gst_warning!( CAT, obj: &element, - "Ignoring offer from peer" + "Ignoring unsupported message {:?}", + msg ); } - JsonMsgInner::Ice { - candidate, - sdp_mline_index, - } => { - if let Err(err) = element.handle_ice( - &msg.peer_id, - Some(sdp_mline_index), - None, - &candidate, - ) { - gst_warning!(CAT, obj: &element, "{}", err); - } - } } } else { gst_error!( @@ -237,20 +240,17 @@ impl Signaller { ) { let state = self.state.lock().unwrap(); - let msg = JsonMsg { + let msg = p::IncomingMessage::Peer(p::PeerMessage { peer_id: peer_id.to_string(), - inner: JsonMsgInner::Sdp(SdpMessage::Offer { + peer_message: p::PeerMessageInner::Sdp(p::SdpMessage::Offer { sdp: sdp.sdp().as_text().unwrap(), }), - }; + }); if let Some(mut sender) = state.websocket_sender.clone() { let element = element.downgrade(); task::spawn(async move { - if let Err(err) = sender - .send(WsMessage::Text(serde_json::to_string(&msg).unwrap())) - .await - { + if let Err(err) = sender.send(msg).await { if let Some(element) = element.upgrade() { element.handle_signalling_error(anyhow!("Error: {}", err).into()); } @@ -264,26 +264,23 @@ impl Signaller { element: &WebRTCSink, peer_id: &str, candidate: &str, - sdp_mline_index: Option, + sdp_m_line_index: Option, _sdp_mid: Option, ) { let state = self.state.lock().unwrap(); - let msg = JsonMsg { + let msg = p::IncomingMessage::Peer(p::PeerMessage { peer_id: peer_id.to_string(), - inner: JsonMsgInner::Ice { + peer_message: p::PeerMessageInner::Ice { candidate: candidate.to_string(), - sdp_mline_index: sdp_mline_index.unwrap(), + sdp_m_line_index: sdp_m_line_index.unwrap(), }, - }; + }); if let Some(mut sender) = state.websocket_sender.clone() { let element = element.downgrade(); task::spawn(async move { - if let Err(err) = sender - .send(WsMessage::Text(serde_json::to_string(&msg).unwrap())) - .await - { + if let Err(err) = sender.send(msg).await { if let Some(element) = element.upgrade() { element.handle_signalling_error(anyhow!("Error: {}", err).into()); } @@ -324,7 +321,9 @@ impl Signaller { if let Some(mut sender) = state.websocket_sender.clone() { task::spawn(async move { if let Err(err) = sender - .send(WsMessage::Text(format!("END_SESSION {}", peer_id))) + .send(p::IncomingMessage::EndSession(p::EndSessionMessage { + peer_id: peer_id.to_string(), + })) .await { if let Some(element) = element.upgrade() { diff --git a/plugins/src/webrtcsink/imp.rs b/plugins/src/webrtcsink/imp.rs index 71f357f3..66231e82 100644 --- a/plugins/src/webrtcsink/imp.rs +++ b/plugins/src/webrtcsink/imp.rs @@ -1455,14 +1455,14 @@ impl WebRTCSink { &self, element: &super::WebRTCSink, peer_id: String, - sdp_mline_index: u32, + sdp_m_line_index: u32, candidate: String, ) { let mut state = self.state.lock().unwrap(); if let Err(err) = state .signaller - .handle_ice(element, &peer_id, &candidate, Some(sdp_mline_index), None) + .handle_ice(element, &peer_id, &candidate, Some(sdp_m_line_index), None) { gst_warning!( CAT, @@ -1516,12 +1516,12 @@ impl WebRTCSink { webrtcbin.connect("on-ice-candidate", false, move |values| { if let Some(element) = element_clone.upgrade() { let this = Self::from_instance(&element); - let sdp_mline_index = values[1].get::().expect("Invalid argument"); + let sdp_m_line_index = values[1].get::().expect("Invalid argument"); let candidate = values[2].get::().expect("Invalid argument"); this.on_ice_candidate( &element, peer_id_clone.to_string(), - sdp_mline_index, + sdp_m_line_index, candidate, ); } @@ -1864,19 +1864,19 @@ impl WebRTCSink { &self, _element: &super::WebRTCSink, peer_id: &str, - sdp_mline_index: Option, + sdp_m_line_index: Option, _sdp_mid: Option, candidate: &str, ) -> Result<(), WebRTCSinkError> { let state = self.state.lock().unwrap(); - let sdp_mline_index = sdp_mline_index.ok_or(WebRTCSinkError::MandatorySdpMlineIndex)?; + let sdp_m_line_index = sdp_m_line_index.ok_or(WebRTCSinkError::MandatorySdpMlineIndex)?; if let Some(consumer) = state.consumers.get(peer_id) { gst_trace!(CAT, "adding ice candidate for peer {}", peer_id); consumer .webrtcbin - .emit_by_name::<()>("add-ice-candidate", &[&sdp_mline_index, &candidate]); + .emit_by_name::<()>("add-ice-candidate", &[&sdp_m_line_index, &candidate]); Ok(()) } else { Err(WebRTCSinkError::NoConsumerWithId(peer_id.to_string())) diff --git a/plugins/src/webrtcsink/mod.rs b/plugins/src/webrtcsink/mod.rs index 4b7080bb..c700c1b8 100644 --- a/plugins/src/webrtcsink/mod.rs +++ b/plugins/src/webrtcsink/mod.rs @@ -41,14 +41,14 @@ pub trait Signallable: Sync + Send + 'static { /// sdp_mid is exposed for future proofing, see /// https://gitlab.freedesktop.org/gstreamer/gst-plugins-bad/-/issues/1174, - /// at the moment sdp_mline_index will always be Some and sdp_mid will always + /// at the moment sdp_m_line_index will always be Some and sdp_mid will always /// be None fn handle_ice( &mut self, element: &WebRTCSink, peer_id: &str, candidate: &str, - sdp_mline_index: Option, + sdp_m_line_index: Option, sdp_mid: Option, ) -> Result<(), Box>; @@ -96,17 +96,17 @@ impl WebRTCSink { /// sdp_mid is exposed for future proofing, see /// https://gitlab.freedesktop.org/gstreamer/gst-plugins-bad/-/issues/1174, - /// at the moment sdp_mline_index must be Some + /// at the moment sdp_m_line_index must be Some pub fn handle_ice( &self, peer_id: &str, - sdp_mline_index: Option, + sdp_m_line_index: Option, sdp_mid: Option, candidate: &str, ) -> Result<(), WebRTCSinkError> { let ws = imp::WebRTCSink::from_instance(self); - ws.handle_ice(self, peer_id, sdp_mline_index, sdp_mid, candidate) + ws.handle_ice(self, peer_id, sdp_m_line_index, sdp_mid, candidate) } pub fn handle_signalling_error(&self, error: Box) { diff --git a/protocol/Cargo.toml b/protocol/Cargo.toml new file mode 100644 index 00000000..8d5f3a0c --- /dev/null +++ b/protocol/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name="webrtcsink-protocol" +version = "0.1.0" +edition = "2018" +authors = ["Mathieu Duponchelle "] +license = "MIT" +description = "GStreamer WebRTC sink default protocol" +repository = "https://github.com/centricular/webrtcsink/" + +[dependencies] +serde = { version = "1", features = ["derive"] } +serde_json = "1" diff --git a/protocol/src/lib.rs b/protocol/src/lib.rs new file mode 100644 index 00000000..55d9d8a8 --- /dev/null +++ b/protocol/src/lib.rs @@ -0,0 +1,134 @@ +/// The default protocol used by the signalling server +use serde::{Deserialize, Serialize}; + +#[derive(Serialize, Deserialize, Debug, PartialEq)] +#[serde(tag = "peerType")] +#[serde(rename_all = "camelCase")] +/// Confirms registration +pub enum RegisteredMessage { + /// Registered as a producer + #[serde(rename_all = "camelCase")] + Producer { peer_id: String }, + /// Registered as a consumer + #[serde(rename_all = "camelCase")] + Consumer { peer_id: String }, + /// Registered as a listener + #[serde(rename_all = "camelCase")] + Listener { peer_id: String }, +} + +#[derive(Serialize, Deserialize, Debug, PartialEq)] +#[serde(tag = "type")] +#[serde(rename_all = "camelCase")] +/// Messages sent from the server to peers +pub enum OutgoingMessage { + /// Confirms registration + Registered(RegisteredMessage), + /// Notifies listeners that a producer was registered + #[serde(rename_all = "camelCase")] + ProducerAdded { peer_id: String }, + /// Notifies listeners that a producer was removed + #[serde(rename_all = "camelCase")] + ProducerRemoved { peer_id: String }, + /// Instructs a peer to generate an offer + #[serde(rename_all = "camelCase")] + StartSession { peer_id: String }, + /// Signals that the session the peer was in was ended + #[serde(rename_all = "camelCase")] + EndSession { peer_id: String }, + /// Messages directly forwarded from one peer to another + Peer(PeerMessage), + /// Provides the current list of consumer peers + List { producers: Vec }, + /// Notifies that an error occurred with the peer's current session + Error { details: String }, +} + +#[derive(Serialize, Deserialize, Debug)] +#[serde(tag = "peerType")] +#[serde(rename_all = "camelCase")] +/// Register with a peer type +pub enum RegisterMessage { + /// Register as a producer + Producer, + /// Register as a consumer + Consumer, + /// Register as a listener + Listener, +} + +#[derive(Serialize, Deserialize, Debug)] +#[serde(rename_all = "camelCase")] +/// Ask the server to start a session with a producer peer +pub struct StartSessionMessage { + /// Identifies the peer + pub peer_id: String, +} + +#[derive(Serialize, Deserialize, Debug, PartialEq)] +#[serde(tag = "type")] +#[serde(rename_all = "camelCase")] +/// Conveys a SDP +pub enum SdpMessage { + /// Conveys an offer + Offer { + /// The SDP + sdp: String, + }, + /// Conveys an answer + Answer { + /// The SDP + sdp: String, + }, +} + +#[derive(Serialize, Deserialize, Debug, PartialEq)] +#[serde(rename_all = "camelCase")] +/// Contents of the peer message +pub enum PeerMessageInner { + /// Conveys an ICE candidate + #[serde(rename_all = "camelCase")] + Ice { + /// The candidate string + candidate: String, + /// The mline index the candidate applies to + sdp_m_line_index: u32, + }, + Sdp(SdpMessage), +} + +#[derive(Serialize, Deserialize, Debug, PartialEq)] +#[serde(rename_all = "camelCase")] +/// Messages directly forwarded from one peer to another +pub struct PeerMessage { + /// The identifier of the peer, which must be in a session with the sender + pub peer_id: String, + /// The contents of the message + #[serde(flatten)] + pub peer_message: PeerMessageInner, +} + +#[derive(Serialize, Deserialize, Debug)] +#[serde(rename_all = "camelCase")] +/// End a session +pub struct EndSessionMessage { + /// The identifier of the peer to end the session with + pub peer_id: String, +} + +#[derive(Serialize, Deserialize, Debug)] +#[serde(tag = "type")] +#[serde(rename_all = "camelCase")] +/// Messages received by the server from peers +pub enum IncomingMessage { + /// Register as a peer type + Register(RegisterMessage), + /// Start a session with a producer peer + StartSession(StartSessionMessage), + /// End an existing session + EndSession(EndSessionMessage), + /// Send a message to a peer the sender is currently in session with + Peer(PeerMessage), + /// Retrieve the current list of producers + List, +} diff --git a/signalling/Cargo.toml b/signalling/Cargo.toml new file mode 100644 index 00000000..ea2f5553 --- /dev/null +++ b/signalling/Cargo.toml @@ -0,0 +1,26 @@ +[package] +name="webrtcsink-signalling" +version = "0.1.0" +edition = "2018" +authors = ["Mathieu Duponchelle "] +license = "MIT" +description = "GStreamer WebRTC sink signalling server" +repository = "https://github.com/centricular/webrtcsink/" + +[dependencies] +anyhow = "1" +async-std = { version = "1", features = ["unstable", "attributes"] } +async-native-tls = "0.3" +async-tungstenite = { version = "0.17", features = ["async-std-runtime", "async-native-tls"] } +serde = { version = "1", features = ["derive"] } +serde_json = "1" +clap = { version = "3", features = ["derive"] } +tracing = { version = "0.1", features = ["log"] } +tracing-subscriber = { version = "0.3", features = ["registry", "env-filter"] } +tracing-log = "0.1" +futures = "0.3" +uuid = { version = "0.8", features = ["v4"] } +thiserror = "1" +test-log = { version = "0.2", features = ["trace"], default-features = false } +pin-project-lite = "0.2" +webrtcsink-protocol = { version = "0.1", path="../protocol" } diff --git a/signalling/simple-server.py b/signalling/simple-server.py deleted file mode 100755 index 1bc0c5f2..00000000 --- a/signalling/simple-server.py +++ /dev/null @@ -1,253 +0,0 @@ -#!/usr/bin/env python3 -# -# Example 1-1 call signalling server -# -# Copyright (C) 2017 Centricular Ltd. -# -# Author: Nirbheek Chauhan -# - -import os -import sys -import ssl -import logging -import asyncio -import websockets -import argparse -import json -import uuid -import concurrent - -from collections import defaultdict - -from concurrent.futures._base import TimeoutError - -parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) -parser.add_argument('--addr', default='0.0.0.0', help='Address to listen on') -parser.add_argument('--port', default=8443, type=int, help='Port to listen on') -parser.add_argument('--keepalive-timeout', dest='keepalive_timeout', default=30, type=int, help='Timeout for keepalive (in seconds)') -parser.add_argument('--disable-ssl', default=False, help='Disable ssl', action='store_true') -parser.add_argument('--hide-local-candidates', '-hd', default=False, help='Hide local Ice candidates', action='store_true') - -options = parser.parse_args(sys.argv[1:]) - -ADDR_PORT = (options.addr, options.port) -KEEPALIVE_TIMEOUT = options.keepalive_timeout - -logger = logging.getLogger('webrtc.signalling') -handler = logging.StreamHandler(sys.stderr) -formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') -handler.setFormatter(formatter) -logger.setLevel(logging.INFO) -logger.addHandler(handler) - -############### Global data ############### - -# Format: {uid: (Peer WebSocketServerProtocol, -# remote_address, -# [<'session'>,]} -peers = dict() -# Format: {caller_uid: [callee_uid,], -# callee_uid: [caller_uid,]} -# Bidirectional mapping between the two peers -sessions = defaultdict(list) - -producers = set() -consumers = set() -listeners = set() - -############### Helper functions ############### - -async def recv_msg_ping(ws, raddr): - ''' - Wait for a message forever, and send a regular ping to prevent bad routers - from closing the connection. - ''' - msg = None - while msg is None: - try: - msg = await asyncio.wait_for(ws.recv(), KEEPALIVE_TIMEOUT) - except (asyncio.TimeoutError, concurrent.futures._base.TimeoutError): - logger.debug('Sending keepalive ping to {!r} in recv'.format(raddr)) - await ws.ping() - return msg - -async def cleanup_session(uid): - for other_id in sessions[uid]: - await peers[other_id][0].send('END_SESSION {}'.format(uid)) - sessions[other_id].remove(uid) - logger.info("Cleaned up {} -> {} session".format(uid, other_id)) - del sessions[uid] - -async def end_session(uid, other_id): - if not other_id in sessions[uid]: - return - - if not uid in sessions[other_id]: - return - - if not other_id in peers: - return - - await peers[other_id][0].send('END_SESSION {}'.format(uid)) - sessions[uid].remove(other_id) - sessions[other_id].remove(uid) - -async def remove_peer(uid): - await cleanup_session(uid) - if uid in peers: - ws, raddr, status = peers[uid] - del peers[uid] - await ws.close() - logger.info("Disconnected from peer {!r} at {!r}".format(uid, raddr)) - - if uid in producers: - for peer_id in listeners: - await peers[peer_id][0].send('REMOVE PRODUCER {}'.format(uid)) - - if uid in producers: - producers.remove(uid) - elif uid in consumers: - consumers.remove(uid) - elif uid in listeners: - listeners.remove(uid) - -############### Handler functions ############### - -async def connection_handler(ws, uid): - global peers, sessions - raddr = ws.remote_address - peers_status = [] - peers[uid] = [ws, raddr, peers_status] - logger.info("Registered peer {!r} at {!r}".format(uid, raddr)) - while True: - # Receive command, wait forever if necessary - msg = await recv_msg_ping(ws, raddr) - # Update current status - peers_status = peers[uid][2] - # Requested a session with a specific peer - if msg.startswith('START_SESSION '): - logger.info("{!r} command {!r}".format(uid, msg)) - _, callee_id = msg.split(maxsplit=1) - if callee_id not in peers: - await ws.send('ERROR peer {!r} not found'.format(callee_id)) - continue - wsc = peers[callee_id][0] - await wsc.send('START_SESSION {}'.format(uid)) - logger.info('Session from {!r} ({!r}) to {!r} ({!r})' - ''.format(uid, raddr, callee_id, wsc.remote_address)) - # Register session - peers[uid][2] = peer_status = 'session' - sessions[uid].append(callee_id) - peers[callee_id][2] = 'session' - sessions[callee_id] .append(uid) - elif msg.startswith('END_SESSION '): - _, peer_id = msg.split(maxsplit=1) - await end_session(uid, peer_id) - elif msg.startswith('LIST'): - answer = ' '.join(['LIST'] + [other_id for other_id in peers if other_id in producers]) - await ws.send(answer) - # We are in a session, messages must be relayed - else: - # We're in a session, route message to connected peer - msg = json.loads(msg) - - if options.hide_local_candidates: - candidate = msg.get("ice", {}).get("candidate") - if candidate: - if candidate.split()[4].endswith(".local"): - logger.info(f"Ignoring local candidate: {candidate}") - continue - - other_id = msg['peer-id'] - try: - wso, oaddr, status = peers[other_id] - except KeyError: - continue - - msg['peer-id'] = uid - msg = json.dumps(msg) - logger.debug("Got peer: {} -> {}: {}".format(uid, other_id, msg)) - await wso.send(msg) - -async def register_peer(ws): - ''' - Register peer - ''' - raddr = ws.remote_address - msg = await ws.recv() - cmd, typ = msg.split(maxsplit=1) - - uid = str(uuid.uuid4()) - - while uid in peers: - uid = str(uuid.uuid4()) - - if cmd != 'REGISTER': - await ws.close(code=1002, reason='invalid protocol') - raise Exception("Invalid registration from {!r}".format(raddr)) - if typ not in ('PRODUCER', 'CONSUMER', 'LISTENER'): - await ws.close(code=1002, reason='invalid protocol') - raise Exception("Invalid registration from {!r}".format(raddr)) - # Send back a HELLO - await ws.send('REGISTERED {}'.format(uid)) - return typ, uid - -async def handler(ws, path): - ''' - All incoming messages are handled here. @path is unused. - ''' - raddr = ws.remote_address - logger.info("Connected to {!r}".format(raddr)) - try: - typ, peer_id = await register_peer(ws) - except: - return - if typ == 'PRODUCER': - for other_id in listeners: - await peers[other_id][0].send('ADD PRODUCER {}'.format(peer_id)) - producers.add(peer_id) - elif typ == 'CONSUMER': - consumers.add(peer_id) - elif typ == 'LISTENER': - listeners.add(peer_id) - - try: - await connection_handler(ws, peer_id) - except websockets.ConnectionClosed: - logger.info("Connection to peer {!r} closed, exiting handler".format(raddr)) - finally: - await remove_peer(peer_id) - -if options.disable_ssl: - sslctx = None -else: - # Create an SSL context to be used by the websocket server - certpath = '.' - chain_pem = os.path.join(certpath, 'cert.pem') - key_pem = os.path.join(certpath, 'key.pem') - - sslctx = ssl.create_default_context() - try: - sslctx.load_cert_chain(chain_pem, keyfile=key_pem) - except FileNotFoundError: - logger.error("Certificates not found, did you run generate_cert.sh?") - sys.exit(1) - # FIXME - sslctx.check_hostname = False - sslctx.verify_mode = ssl.CERT_NONE - -logger.info("Listening on wss://{}:{}".format(*ADDR_PORT)) -# Websocket server -wsd = websockets.serve(handler, *ADDR_PORT, ssl=sslctx, - # Maximum number of messages that websockets will pop - # off the asyncio and OS buffers per connection. See: - # https://websockets.readthedocs.io/en/stable/api.html#websockets.protocol.WebSocketCommonProtocol - max_queue=16) - -ws_logger = logging.getLogger('websockets.server') -ws_logger.setLevel(logging.ERROR) -ws_logger.addHandler(logging.StreamHandler()) - -asyncio.get_event_loop().run_until_complete(wsd) -asyncio.get_event_loop().run_forever() diff --git a/signalling/src/bin/server.rs b/signalling/src/bin/server.rs new file mode 100644 index 00000000..65f41055 --- /dev/null +++ b/signalling/src/bin/server.rs @@ -0,0 +1,101 @@ +use async_std::task; +use clap::Parser; +use tracing_subscriber::prelude::*; +use webrtcsink_signalling::handlers::Handler; +use webrtcsink_signalling::server::Server; + +use anyhow::Error; +use async_native_tls::TlsAcceptor; +use async_std::fs::File as AsyncFile; +use async_std::net::TcpListener; +use tracing::{info, warn}; + +#[derive(Parser, Debug)] +#[clap(about, version, author)] +/// Program arguments +struct Args { + /// Address to listen on + #[clap(short, long, default_value = "0.0.0.0")] + host: String, + /// Port to listen on + #[clap(short, long, default_value_t = 8443)] + port: u16, + /// TLS certificate to use + #[clap(short, long)] + cert: Option, + /// password to TLS certificate + #[clap(long)] + cert_password: Option, +} + +fn initialize_logging(envvar_name: &str) -> Result<(), Error> { + tracing_log::LogTracer::init()?; + let env_filter = tracing_subscriber::EnvFilter::try_from_env(envvar_name) + .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")); + let fmt_layer = tracing_subscriber::fmt::layer() + .with_thread_ids(true) + .with_target(true) + .with_span_events( + tracing_subscriber::fmt::format::FmtSpan::NEW + | tracing_subscriber::fmt::format::FmtSpan::CLOSE, + ); + let subscriber = tracing_subscriber::Registry::default() + .with(env_filter) + .with(fmt_layer); + tracing::subscriber::set_global_default(subscriber)?; + + Ok(()) +} + +fn main() -> Result<(), Error> { + let args = Args::parse(); + let server = Server::spawn(|stream| Handler::new(stream)); + + initialize_logging("WEBRTCSINK_SIGNALLING_SERVER_LOG")?; + + task::block_on(async move { + let addr = format!("{}:{}", args.host, args.port); + + // Create the event loop and TCP listener we'll accept connections on. + let listener = TcpListener::bind(&addr).await?; + + let acceptor = match args.cert { + Some(cert) => { + let key = AsyncFile::open(cert).await?; + Some(TlsAcceptor::new(key, args.cert_password.as_deref().unwrap_or("")).await?) + } + None => None, + }; + + info!("Listening on: {}", addr); + + while let Ok((stream, _)) = listener.accept().await { + let mut server_clone = server.clone(); + + let address = match stream.peer_addr() { + Ok(address) => address, + Err(err) => { + warn!("Connected peer with no address: {}", err); + continue; + } + }; + + info!("Accepting connection from {}", address); + + if let Some(ref acceptor) = acceptor { + let stream = match acceptor.accept(stream).await { + Ok(stream) => stream, + Err(err) => { + warn!("Failed to accept TLS connection from {}: {}", address, err); + continue; + } + }; + task::spawn(async move { server_clone.accept_async(stream).await }); + } else { + task::spawn(async move { server_clone.accept_async(stream).await }); + } + } + + Ok(()) + }) +} diff --git a/signalling/src/handlers/mod.rs b/signalling/src/handlers/mod.rs new file mode 100644 index 00000000..3fc27938 --- /dev/null +++ b/signalling/src/handlers/mod.rs @@ -0,0 +1,1091 @@ +use anyhow::{anyhow, Error}; +use futures::prelude::*; +use futures::ready; +use pin_project_lite::pin_project; +use std::collections::{HashMap, HashSet, VecDeque}; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tracing::{info, instrument}; +use webrtcsink_protocol as p; + +type PeerId = String; + +pin_project! { + #[must_use = "streams do nothing unless polled"] + pub struct Handler { + #[pin] + stream: Pin)> + Send>>, + items: VecDeque<(String, p::OutgoingMessage)>, + producers: HashMap>, + consumers: HashMap>, + listeners: HashSet, + } +} + +impl Handler { + #[instrument(level = "debug", skip(stream))] + /// Create a handler + pub fn new( + stream: Pin)> + Send>>, + ) -> Self { + Self { + stream, + items: VecDeque::new(), + producers: HashMap::new(), + consumers: HashMap::new(), + listeners: HashSet::new(), + } + } + + #[instrument(level = "trace", skip(self))] + fn handle( + mut self: Pin<&mut Self>, + peer_id: &str, + msg: p::IncomingMessage, + ) -> Result<(), Error> { + match msg { + p::IncomingMessage::Register(message) => match message { + p::RegisterMessage::Producer => self.register_producer(peer_id), + p::RegisterMessage::Consumer => self.register_consumer(peer_id), + p::RegisterMessage::Listener => self.register_listener(peer_id), + }, + p::IncomingMessage::StartSession(message) => { + self.start_session(&message.peer_id, peer_id) + } + p::IncomingMessage::Peer(p::PeerMessage { + peer_id: other_peer_id, + peer_message, + }) => match peer_message { + p::PeerMessageInner::Ice { + candidate, + sdp_m_line_index, + } => self.handle_ice(candidate, sdp_m_line_index, peer_id, &other_peer_id), + p::PeerMessageInner::Sdp(sdp_message) => match sdp_message { + p::SdpMessage::Offer { sdp } => { + self.handle_sdp_offer(sdp, peer_id, &other_peer_id) + } + p::SdpMessage::Answer { sdp } => { + self.handle_sdp_answer(sdp, peer_id, &other_peer_id) + } + }, + }, + p::IncomingMessage::List => self.list_producers(peer_id), + p::IncomingMessage::EndSession(p::EndSessionMessage { + peer_id: other_peer_id, + }) => self.end_session(peer_id, &other_peer_id), + } + } + + #[instrument(level = "debug", skip(self))] + /// Remove a peer, this can cause sessions to be ended + fn remove_peer(&mut self, peer_id: &str) { + info!(peer_id = %peer_id, "removing peer"); + + self.listeners.remove(peer_id); + + if let Some(consumers) = self.producers.remove(peer_id) { + for consumer_id in &consumers { + info!(producer_id=%peer_id, consumer_id=%consumer_id, "ended session"); + self.consumers.insert(consumer_id.clone(), None); + self.items.push_back(( + consumer_id.to_string(), + p::OutgoingMessage::EndSession { + peer_id: peer_id.to_string(), + }, + )); + } + + for listener in &self.listeners { + self.items.push_back(( + listener.to_string(), + p::OutgoingMessage::ProducerRemoved { + peer_id: peer_id.to_string(), + }, + )); + } + } + + if let Some(Some(producer_id)) = self.consumers.remove(peer_id) { + info!(producer_id=%producer_id, consumer_id=%peer_id, "ended session"); + + self.producers + .get_mut(&producer_id) + .unwrap() + .remove(peer_id); + + self.items.push_back(( + producer_id.to_string(), + p::OutgoingMessage::EndSession { + peer_id: peer_id.to_string(), + }, + )); + } + } + + #[instrument(level = "debug", skip(self))] + /// End a session between two peers + fn end_session(&mut self, peer_id: &str, other_peer_id: &str) -> Result<(), Error> { + info!(peer_id=%peer_id, other_peer_id=%other_peer_id, "endsession request"); + if let Some(ref mut consumers) = self.producers.get_mut(peer_id) { + if consumers.remove(other_peer_id) { + info!(producer_id=%peer_id, consumer_id=%other_peer_id, "ended session"); + + self.items.push_back(( + other_peer_id.to_string(), + p::OutgoingMessage::EndSession { + peer_id: peer_id.to_string(), + }, + )); + + self.consumers.insert(other_peer_id.to_string(), None); + Ok(()) + } else { + Err(anyhow!( + "Producer {} has no consumer {}", + peer_id, + other_peer_id + )) + } + } else if let Some(Some(producer_id)) = self.consumers.get(peer_id) { + if producer_id == other_peer_id { + info!(producer_id=%other_peer_id, consumer_id=%peer_id, "ended session"); + + self.consumers.insert(peer_id.to_string(), None); + self.producers + .get_mut(other_peer_id) + .unwrap() + .remove(peer_id); + + self.items.push_back(( + other_peer_id.to_string(), + p::OutgoingMessage::EndSession { + peer_id: peer_id.to_string(), + }, + )); + + Ok(()) + } else { + Err(anyhow!( + "Consumer {} is not in a session with {}", + peer_id, + other_peer_id + )) + } + } else { + Err(anyhow!( + "No session between {} and {}", + peer_id, + other_peer_id + )) + } + } + + /// List producer peers + #[instrument(level = "debug", skip(self))] + fn list_producers(&mut self, peer_id: &str) -> Result<(), Error> { + self.items.push_back(( + peer_id.to_string(), + p::OutgoingMessage::List { + producers: self.producers.keys().cloned().collect(), + }, + )); + + Ok(()) + } + + /// Handle ICE candidate sent by one peer to another peer + #[instrument(level = "debug", skip(self))] + fn handle_ice( + &mut self, + candidate: String, + sdp_m_line_index: u32, + peer_id: &str, + other_peer_id: &str, + ) -> Result<(), Error> { + if let Some(consumers) = self.producers.get(peer_id) { + if consumers.contains(other_peer_id) { + self.items.push_back(( + other_peer_id.to_string(), + p::OutgoingMessage::Peer(p::PeerMessage { + peer_id: peer_id.to_string(), + peer_message: p::PeerMessageInner::Ice { + candidate, + sdp_m_line_index, + }, + }), + )); + Ok(()) + } else { + Err(anyhow!( + "cannot forward ICE from {} to {} as they are not in a session", + peer_id, + other_peer_id + )) + } + } else if let Some(producer) = self.consumers.get(peer_id) { + if &Some(other_peer_id.to_string()) == producer { + self.items.push_back(( + other_peer_id.to_string(), + p::OutgoingMessage::Peer(p::PeerMessage { + peer_id: peer_id.to_string(), + peer_message: p::PeerMessageInner::Ice { + candidate, + sdp_m_line_index, + }, + }), + )); + + Ok(()) + } else { + Err(anyhow!( + "cannot forward ICE from {} to {} as they are not in a session", + peer_id, + other_peer_id + )) + } + } else { + Err(anyhow!( + "cannot forward ICE from {} to {} as they are not in a session", + peer_id, + other_peer_id, + )) + } + } + + /// Handle SDP offered by one peer to another peer + #[instrument(level = "debug", skip(self))] + fn handle_sdp_offer( + &mut self, + sdp: String, + producer_id: &str, + consumer_id: &str, + ) -> Result<(), Error> { + if let Some(consumers) = self.producers.get(producer_id) { + if consumers.contains(consumer_id) { + self.items.push_back(( + consumer_id.to_string(), + p::OutgoingMessage::Peer(p::PeerMessage { + peer_id: producer_id.to_string(), + peer_message: p::PeerMessageInner::Sdp(p::SdpMessage::Offer { sdp }), + }), + )); + Ok(()) + } else { + Err(anyhow!( + "cannot forward offer from {} to {} as they are not in a session", + producer_id, + consumer_id + )) + } + } else { + Err(anyhow!( + "cannot forward offer from {} to {} as they are not in a session or {} is not the producer", + producer_id, + consumer_id, + producer_id, + )) + } + } + + /// Handle the SDP answer from one peer to another peer + #[instrument(level = "debug", skip(self))] + fn handle_sdp_answer( + &mut self, + sdp: String, + consumer_id: &str, + producer_id: &str, + ) -> Result<(), Error> { + if let Some(producer) = self.consumers.get(consumer_id) { + if &Some(producer_id.to_string()) == producer { + self.items.push_back(( + producer_id.to_string(), + p::OutgoingMessage::Peer(p::PeerMessage { + peer_id: consumer_id.to_string(), + peer_message: p::PeerMessageInner::Sdp(p::SdpMessage::Answer { sdp }), + }), + )); + Ok(()) + } else { + Err(anyhow!( + "cannot forward answer from {} to {} as they are not in a session", + consumer_id, + producer_id + )) + } + } else { + Err(anyhow!( + "cannot forward answer from {} to {} as they are not in a session", + consumer_id, + producer_id + )) + } + } + + /// Register peer as a producer + #[instrument(level = "debug", skip(self))] + fn register_producer(&mut self, peer_id: &str) -> Result<(), Error> { + if self.producers.contains_key(peer_id) { + Err(anyhow!("{} is already registered as a producer", peer_id)) + } else { + self.producers.insert(peer_id.to_string(), HashSet::new()); + + for listener in &self.listeners { + self.items.push_back(( + listener.to_string(), + p::OutgoingMessage::ProducerAdded { + peer_id: peer_id.to_string(), + }, + )); + } + + self.items.push_back(( + peer_id.to_string(), + p::OutgoingMessage::Registered(p::RegisteredMessage::Producer { + peer_id: peer_id.to_string(), + }), + )); + + info!(peer_id = %peer_id, "registered as a producer"); + + Ok(()) + } + } + + /// Register peer as a consumer + #[instrument(level = "debug", skip(self))] + fn register_consumer(&mut self, peer_id: &str) -> Result<(), Error> { + if self.consumers.contains_key(peer_id) { + Err(anyhow!("{} is already registered as a consumer", peer_id)) + } else { + self.consumers.insert(peer_id.to_string(), None); + + self.items.push_back(( + peer_id.to_string(), + p::OutgoingMessage::Registered(p::RegisteredMessage::Consumer { + peer_id: peer_id.to_string(), + }), + )); + + info!(peer_id = %peer_id, "registered as a consumer"); + + Ok(()) + } + } + + /// Register peer as a listener + #[instrument(level = "debug", skip(self))] + fn register_listener(&mut self, peer_id: &str) -> Result<(), Error> { + if !self.listeners.insert(peer_id.to_string()) { + Err(anyhow!("{} is already registered as a listener", peer_id)) + } else { + self.items.push_back(( + peer_id.to_string(), + p::OutgoingMessage::Registered(p::RegisteredMessage::Listener { + peer_id: peer_id.to_string(), + }), + )); + + info!(peer_id = %peer_id, "registered as a listener"); + + Ok(()) + } + } + + /// Start a session between two peers + #[instrument(level = "debug", skip(self))] + fn start_session(&mut self, producer_id: &str, consumer_id: &str) -> Result<(), Error> { + if !self.consumers.contains_key(consumer_id) { + return Err(anyhow!( + "Peer with id {} is not registered as a consumer", + consumer_id + )); + } + + if let Some(producer_id) = self.consumers.get(consumer_id).unwrap() { + return Err(anyhow!( + "Consumer with id {} is already in a session with producer {}", + consumer_id, + producer_id, + )); + } + + if !self.producers.contains_key(producer_id) { + return Err(anyhow!( + "Peer with id {} is not registered as a producer", + producer_id + )); + } + + self.consumers + .insert(consumer_id.to_string(), Some(producer_id.to_string())); + self.producers + .get_mut(producer_id) + .unwrap() + .insert(consumer_id.to_string()); + + self.items.push_back(( + producer_id.to_string(), + p::OutgoingMessage::StartSession { + peer_id: consumer_id.to_string(), + }, + )); + + info!(producer_id = %producer_id, consumer_id = %consumer_id, "started a session"); + + Ok(()) + } +} + +impl Stream for Handler { + type Item = (String, p::OutgoingMessage); + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + loop { + let this = self.as_mut().project(); + + if let Some(item) = this.items.pop_front() { + break Poll::Ready(Some(item)); + } + + match ready!(this.stream.poll_next(cx)) { + Some((peer_id, msg)) => { + if let Some(msg) = msg { + if let Err(err) = self.as_mut().handle(&peer_id, msg) { + self.items.push_back(( + peer_id.to_string(), + p::OutgoingMessage::Error { + details: err.to_string(), + }, + )); + } + } else { + self.remove_peer(&peer_id); + } + } + None => { + break Poll::Ready(None); + } + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use futures::channel::mpsc; + + #[async_std::test] + async fn test_register_producer() { + let (mut tx, rx) = mpsc::unbounded(); + let mut handler = Handler::new(Box::pin(rx)); + + let message = p::IncomingMessage::Register(p::RegisterMessage::Producer); + + tx.send(("producer".to_string(), Some(message))) + .await + .unwrap(); + + let (peer_id, sent_message) = handler.next().await.unwrap(); + + assert_eq!(peer_id, "producer"); + assert_eq!( + sent_message, + p::OutgoingMessage::Registered(p::RegisteredMessage::Producer { + peer_id: "producer".to_string() + }) + ); + } + + #[async_std::test] + async fn test_list_producers() { + let (mut tx, rx) = mpsc::unbounded(); + let mut handler = Handler::new(Box::pin(rx)); + + let message = p::IncomingMessage::Register(p::RegisterMessage::Producer); + + tx.send(("producer".to_string(), Some(message))) + .await + .unwrap(); + + let _ = handler.next().await.unwrap(); + + let message = p::IncomingMessage::List; + + tx.send(("listener".to_string(), Some(message))) + .await + .unwrap(); + + let (peer_id, sent_message) = handler.next().await.unwrap(); + + assert_eq!(peer_id, "listener"); + assert_eq!( + sent_message, + p::OutgoingMessage::List { + producers: vec!["producer".to_string()] + } + ); + } + + #[async_std::test] + async fn test_register_consumer() { + let (mut tx, rx) = mpsc::unbounded(); + let mut handler = Handler::new(Box::pin(rx)); + + let message = p::IncomingMessage::Register(p::RegisterMessage::Consumer); + + tx.send(("consumer".to_string(), Some(message))) + .await + .unwrap(); + + let (peer_id, sent_message) = handler.next().await.unwrap(); + + assert_eq!(peer_id, "consumer"); + assert_eq!( + sent_message, + p::OutgoingMessage::Registered(p::RegisteredMessage::Consumer { + peer_id: "consumer".to_string() + }) + ); + } + + #[async_std::test] + async fn test_register_producer_twice() { + let (mut tx, rx) = mpsc::unbounded(); + let mut handler = Handler::new(Box::pin(rx)); + + let message = p::IncomingMessage::Register(p::RegisterMessage::Producer); + tx.send(("producer".to_string(), Some(message))) + .await + .unwrap(); + let _ = handler.next().await.unwrap(); + + let message = p::IncomingMessage::Register(p::RegisterMessage::Producer); + tx.send(("producer".to_string(), Some(message))) + .await + .unwrap(); + let (peer_id, sent_message) = handler.next().await.unwrap(); + + assert_eq!(peer_id, "producer"); + assert_eq!( + sent_message, + p::OutgoingMessage::Error { + details: "producer is already registered as a producer".into() + } + ); + } + + #[async_std::test] + async fn test_listener() { + let (mut tx, rx) = mpsc::unbounded(); + let mut handler = Handler::new(Box::pin(rx)); + + let message = p::IncomingMessage::Register(p::RegisterMessage::Listener); + tx.send(("listener".to_string(), Some(message))) + .await + .unwrap(); + let _ = handler.next().await.unwrap(); + + let message = p::IncomingMessage::Register(p::RegisterMessage::Producer); + tx.send(("producer".to_string(), Some(message))) + .await + .unwrap(); + let (peer_id, sent_message) = handler.next().await.unwrap(); + + assert_eq!(peer_id, "listener"); + assert_eq!( + sent_message, + p::OutgoingMessage::ProducerAdded { + peer_id: "producer".to_string() + } + ); + } + + #[async_std::test] + async fn test_start_session() { + let (mut tx, rx) = mpsc::unbounded(); + let mut handler = Handler::new(Box::pin(rx)); + + let message = p::IncomingMessage::Register(p::RegisterMessage::Producer); + tx.send(("producer".to_string(), Some(message))) + .await + .unwrap(); + let _ = handler.next().await.unwrap(); + + let message = p::IncomingMessage::Register(p::RegisterMessage::Consumer); + tx.send(("consumer".to_string(), Some(message))) + .await + .unwrap(); + let _ = handler.next().await.unwrap(); + + let message = p::IncomingMessage::StartSession(p::StartSessionMessage { + peer_id: "producer".to_string(), + }); + tx.send(("consumer".to_string(), Some(message))) + .await + .unwrap(); + + let (peer_id, sent_message) = handler.next().await.unwrap(); + + assert_eq!(peer_id, "producer"); + assert_eq!( + sent_message, + p::OutgoingMessage::StartSession { + peer_id: "consumer".to_string() + } + ); + } + + #[async_std::test] + async fn test_remove_peer() { + let (mut tx, rx) = mpsc::unbounded(); + let mut handler = Handler::new(Box::pin(rx)); + + let message = p::IncomingMessage::Register(p::RegisterMessage::Producer); + tx.send(("producer".to_string(), Some(message))) + .await + .unwrap(); + let _ = handler.next().await.unwrap(); + + let message = p::IncomingMessage::Register(p::RegisterMessage::Consumer); + tx.send(("consumer".to_string(), Some(message))) + .await + .unwrap(); + let _ = handler.next().await.unwrap(); + + let message = p::IncomingMessage::StartSession(p::StartSessionMessage { + peer_id: "producer".to_string(), + }); + tx.send(("consumer".to_string(), Some(message))) + .await + .unwrap(); + let _ = handler.next().await.unwrap(); + + let message = p::IncomingMessage::Register(p::RegisterMessage::Listener); + tx.send(("listener".to_string(), Some(message))) + .await + .unwrap(); + let _ = handler.next().await.unwrap(); + + handler.remove_peer("producer"); + let (peer_id, sent_message) = handler.next().await.unwrap(); + + assert_eq!(peer_id, "consumer"); + assert_eq!( + sent_message, + p::OutgoingMessage::EndSession { + peer_id: "producer".to_string() + } + ); + + let (peer_id, sent_message) = handler.next().await.unwrap(); + + assert_eq!(peer_id, "listener"); + assert_eq!( + sent_message, + p::OutgoingMessage::ProducerRemoved { + peer_id: "producer".to_string() + } + ); + } + + #[async_std::test] + async fn test_end_session_consumer() { + let (mut tx, rx) = mpsc::unbounded(); + let mut handler = Handler::new(Box::pin(rx)); + + let message = p::IncomingMessage::Register(p::RegisterMessage::Producer); + tx.send(("producer".to_string(), Some(message))) + .await + .unwrap(); + let _ = handler.next().await.unwrap(); + + let message = p::IncomingMessage::Register(p::RegisterMessage::Consumer); + tx.send(("consumer".to_string(), Some(message))) + .await + .unwrap(); + let _ = handler.next().await.unwrap(); + + let message = p::IncomingMessage::StartSession(p::StartSessionMessage { + peer_id: "producer".to_string(), + }); + tx.send(("consumer".to_string(), Some(message))) + .await + .unwrap(); + let _ = handler.next().await.unwrap(); + + let message = p::IncomingMessage::EndSession(p::EndSessionMessage { + peer_id: "producer".to_string(), + }); + tx.send(("consumer".to_string(), Some(message))) + .await + .unwrap(); + let (peer_id, sent_message) = handler.next().await.unwrap(); + + assert_eq!(peer_id, "producer"); + assert_eq!( + sent_message, + p::OutgoingMessage::EndSession { + peer_id: "consumer".to_string() + } + ); + } + + #[async_std::test] + async fn test_end_session_producer() { + let (mut tx, rx) = mpsc::unbounded(); + let mut handler = Handler::new(Box::pin(rx)); + + let message = p::IncomingMessage::Register(p::RegisterMessage::Producer); + tx.send(("producer".to_string(), Some(message))) + .await + .unwrap(); + let _ = handler.next().await.unwrap(); + + let message = p::IncomingMessage::Register(p::RegisterMessage::Consumer); + tx.send(("consumer".to_string(), Some(message))) + .await + .unwrap(); + let _ = handler.next().await.unwrap(); + + let message = p::IncomingMessage::StartSession(p::StartSessionMessage { + peer_id: "producer".to_string(), + }); + tx.send(("consumer".to_string(), Some(message))) + .await + .unwrap(); + let _ = handler.next().await.unwrap(); + + let message = p::IncomingMessage::EndSession(p::EndSessionMessage { + peer_id: "consumer".to_string(), + }); + tx.send(("producer".to_string(), Some(message))) + .await + .unwrap(); + let (peer_id, sent_message) = handler.next().await.unwrap(); + + assert_eq!(peer_id, "consumer"); + assert_eq!( + sent_message, + p::OutgoingMessage::EndSession { + peer_id: "producer".to_string() + } + ); + } + + #[async_std::test] + async fn test_end_session_twice() { + let (mut tx, rx) = mpsc::unbounded(); + let mut handler = Handler::new(Box::pin(rx)); + + let message = p::IncomingMessage::Register(p::RegisterMessage::Producer); + tx.send(("producer".to_string(), Some(message))) + .await + .unwrap(); + let _ = handler.next().await.unwrap(); + + let message = p::IncomingMessage::Register(p::RegisterMessage::Consumer); + tx.send(("consumer".to_string(), Some(message))) + .await + .unwrap(); + let _ = handler.next().await.unwrap(); + + let message = p::IncomingMessage::StartSession(p::StartSessionMessage { + peer_id: "producer".to_string(), + }); + tx.send(("consumer".to_string(), Some(message))) + .await + .unwrap(); + let _ = handler.next().await.unwrap(); + + let message = p::IncomingMessage::EndSession(p::EndSessionMessage { + peer_id: "consumer".to_string(), + }); + tx.send(("producer".to_string(), Some(message))) + .await + .unwrap(); + let _ = handler.next().await.unwrap(); + + let message = p::IncomingMessage::EndSession(p::EndSessionMessage { + peer_id: "consumer".to_string(), + }); + tx.send(("producer".to_string(), Some(message))) + .await + .unwrap(); + let (peer_id, sent_message) = handler.next().await.unwrap(); + + assert_eq!(peer_id, "producer"); + assert_eq!( + sent_message, + p::OutgoingMessage::Error { + details: "Producer producer has no consumer consumer".into() + } + ); + } + + #[async_std::test] + async fn test_sdp_exchange() { + let (mut tx, rx) = mpsc::unbounded(); + let mut handler = Handler::new(Box::pin(rx)); + + let message = p::IncomingMessage::Register(p::RegisterMessage::Producer); + tx.send(("producer".to_string(), Some(message))) + .await + .unwrap(); + let _ = handler.next().await.unwrap(); + + let message = p::IncomingMessage::Register(p::RegisterMessage::Consumer); + tx.send(("consumer".to_string(), Some(message))) + .await + .unwrap(); + let _ = handler.next().await.unwrap(); + + let message = p::IncomingMessage::StartSession(p::StartSessionMessage { + peer_id: "producer".to_string(), + }); + tx.send(("consumer".to_string(), Some(message))) + .await + .unwrap(); + let _ = handler.next().await.unwrap(); + + let message = p::IncomingMessage::Peer(p::PeerMessage { + peer_id: "consumer".to_string(), + peer_message: p::PeerMessageInner::Sdp(p::SdpMessage::Offer { + sdp: "offer".to_string(), + }), + }); + tx.send(("producer".to_string(), Some(message))) + .await + .unwrap(); + let (peer_id, sent_message) = handler.next().await.unwrap(); + + assert_eq!(peer_id, "consumer"); + assert_eq!( + sent_message, + p::OutgoingMessage::Peer(p::PeerMessage { + peer_id: "producer".to_string(), + peer_message: p::PeerMessageInner::Sdp(p::SdpMessage::Offer { + sdp: "offer".to_string() + }) + }) + ); + } + + #[async_std::test] + async fn test_ice_exchange() { + let (mut tx, rx) = mpsc::unbounded(); + let mut handler = Handler::new(Box::pin(rx)); + + let message = p::IncomingMessage::Register(p::RegisterMessage::Producer); + tx.send(("producer".to_string(), Some(message))) + .await + .unwrap(); + let _ = handler.next().await.unwrap(); + + let message = p::IncomingMessage::Register(p::RegisterMessage::Consumer); + tx.send(("consumer".to_string(), Some(message))) + .await + .unwrap(); + let _ = handler.next().await.unwrap(); + + let message = p::IncomingMessage::StartSession(p::StartSessionMessage { + peer_id: "producer".to_string(), + }); + tx.send(("consumer".to_string(), Some(message))) + .await + .unwrap(); + let _ = handler.next().await.unwrap(); + + let message = p::IncomingMessage::Peer(p::PeerMessage { + peer_id: "consumer".to_string(), + peer_message: p::PeerMessageInner::Ice { + candidate: "candidate".to_string(), + sdp_m_line_index: 42, + }, + }); + tx.send(("producer".to_string(), Some(message))) + .await + .unwrap(); + let (peer_id, sent_message) = handler.next().await.unwrap(); + + assert_eq!(peer_id, "consumer"); + assert_eq!( + sent_message, + p::OutgoingMessage::Peer(p::PeerMessage { + peer_id: "producer".to_string(), + peer_message: p::PeerMessageInner::Ice { + candidate: "candidate".to_string(), + sdp_m_line_index: 42 + } + }) + ); + + let message = p::IncomingMessage::Peer(p::PeerMessage { + peer_id: "producer".to_string(), + peer_message: p::PeerMessageInner::Ice { + candidate: "candidate".to_string(), + sdp_m_line_index: 42, + }, + }); + tx.send(("consumer".to_string(), Some(message))) + .await + .unwrap(); + let (peer_id, sent_message) = handler.next().await.unwrap(); + + assert_eq!(peer_id, "producer"); + assert_eq!( + sent_message, + p::OutgoingMessage::Peer(p::PeerMessage { + peer_id: "consumer".to_string(), + peer_message: p::PeerMessageInner::Ice { + candidate: "candidate".to_string(), + sdp_m_line_index: 42 + } + }) + ); + } + + #[async_std::test] + async fn test_sdp_exchange_wrong_direction_offer() { + let (mut tx, rx) = mpsc::unbounded(); + let mut handler = Handler::new(Box::pin(rx)); + + let message = p::IncomingMessage::Register(p::RegisterMessage::Producer); + tx.send(("producer".to_string(), Some(message))) + .await + .unwrap(); + let _ = handler.next().await.unwrap(); + + let message = p::IncomingMessage::Register(p::RegisterMessage::Consumer); + tx.send(("consumer".to_string(), Some(message))) + .await + .unwrap(); + let _ = handler.next().await.unwrap(); + + let message = p::IncomingMessage::StartSession(p::StartSessionMessage { + peer_id: "producer".to_string(), + }); + tx.send(("consumer".to_string(), Some(message))) + .await + .unwrap(); + let _ = handler.next().await.unwrap(); + + let message = p::IncomingMessage::Peer(p::PeerMessage { + peer_id: "producer".to_string(), + peer_message: p::PeerMessageInner::Sdp(p::SdpMessage::Offer { + sdp: "offer".to_string(), + }), + }); + tx.send(("consumer".to_string(), Some(message))) + .await + .unwrap(); + let (peer_id, sent_message) = handler.next().await.unwrap(); + + assert_eq!(peer_id, "consumer"); + assert_eq!(sent_message, + p::OutgoingMessage::Error { + details: "cannot forward offer from consumer to producer as they are not in a session or consumer is not the producer".into() + } + ); + } + + #[async_std::test] + async fn test_start_session_no_producer() { + let (mut tx, rx) = mpsc::unbounded(); + let mut handler = Handler::new(Box::pin(rx)); + + let message = p::IncomingMessage::Register(p::RegisterMessage::Consumer); + tx.send(("consumer".to_string(), Some(message))) + .await + .unwrap(); + let _ = handler.next().await.unwrap(); + + let message = p::IncomingMessage::StartSession(p::StartSessionMessage { + peer_id: "producer".to_string(), + }); + tx.send(("consumer".to_string(), Some(message))) + .await + .unwrap(); + let (peer_id, sent_message) = handler.next().await.unwrap(); + + assert_eq!(peer_id, "consumer"); + assert_eq!( + sent_message, + p::OutgoingMessage::Error { + details: "Peer with id producer is not registered as a producer".into() + } + ); + } + + #[async_std::test] + async fn test_start_session_no_consumer() { + let (mut tx, rx) = mpsc::unbounded(); + let mut handler = Handler::new(Box::pin(rx)); + + let message = p::IncomingMessage::Register(p::RegisterMessage::Producer); + tx.send(("producer".to_string(), Some(message))) + .await + .unwrap(); + let _ = handler.next().await.unwrap(); + + let message = p::IncomingMessage::StartSession(p::StartSessionMessage { + peer_id: "producer".to_string(), + }); + tx.send(("consumer".to_string(), Some(message))) + .await + .unwrap(); + let (peer_id, sent_message) = handler.next().await.unwrap(); + + assert_eq!(peer_id, "consumer"); + assert_eq!( + sent_message, + p::OutgoingMessage::Error { + details: "Peer with id consumer is not registered as a consumer".into() + } + ); + } + + #[async_std::test] + async fn test_start_session_twice() { + let (mut tx, rx) = mpsc::unbounded(); + let mut handler = Handler::new(Box::pin(rx)); + + let message = p::IncomingMessage::Register(p::RegisterMessage::Producer); + tx.send(("producer".to_string(), Some(message))) + .await + .unwrap(); + let _ = handler.next().await.unwrap(); + + let message = p::IncomingMessage::Register(p::RegisterMessage::Consumer); + tx.send(("consumer".to_string(), Some(message))) + .await + .unwrap(); + let _ = handler.next().await.unwrap(); + + let message = p::IncomingMessage::StartSession(p::StartSessionMessage { + peer_id: "producer".to_string(), + }); + tx.send(("consumer".to_string(), Some(message))) + .await + .unwrap(); + let _ = handler.next().await.unwrap(); + + let message = p::IncomingMessage::StartSession(p::StartSessionMessage { + peer_id: "producer".to_string(), + }); + + tx.send(("consumer".to_string(), Some(message))) + .await + .unwrap(); + let (peer_id, sent_message) = handler.next().await.unwrap(); + + assert_eq!(peer_id, "consumer"); + assert_eq!( + sent_message, + p::OutgoingMessage::Error { + details: "Consumer with id consumer is already in a session with producer producer" + .into() + } + ); + } +} diff --git a/signalling/src/lib.rs b/signalling/src/lib.rs new file mode 100644 index 00000000..068798b4 --- /dev/null +++ b/signalling/src/lib.rs @@ -0,0 +1,2 @@ +pub mod handlers; +pub mod server; diff --git a/signalling/src/server/mod.rs b/signalling/src/server/mod.rs new file mode 100644 index 00000000..acde56bc --- /dev/null +++ b/signalling/src/server/mod.rs @@ -0,0 +1,182 @@ +use anyhow::Error; +use async_std::task; +use async_tungstenite::tungstenite::Message as WsMessage; +use futures::channel::mpsc; +use futures::prelude::*; +use futures::{AsyncRead, AsyncWrite}; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::pin::Pin; +use std::sync::{Arc, Mutex}; +use tracing::{info, instrument, trace, warn}; + +struct Peer { + receive_task_handle: task::JoinHandle<()>, + send_task_handle: task::JoinHandle>, + sender: mpsc::Sender, +} + +struct State { + tx: Option)>>, + peers: HashMap, +} + +#[derive(Clone)] +pub struct Server { + state: Arc>, +} + +#[derive(thiserror::Error, Debug)] +pub enum ServerError { + #[error("error during handshake {0}")] + Handshake(#[from] async_tungstenite::tungstenite::Error), +} + +impl Server { + #[instrument(level = "debug", skip(factory))] + pub fn spawn< + I: for<'a> Deserialize<'a>, + O: Serialize + std::fmt::Debug, + Factory: FnOnce(Pin)> + Send>>) -> St, + St: Stream, + >( + factory: Factory, + ) -> Self + where + O: Serialize + std::fmt::Debug, + St: Send + Unpin + 'static, + { + let (tx, rx) = mpsc::channel::<(String, Option)>(1000); + let mut handler = factory(Box::pin(rx.filter_map(|(peer_id, msg)| async move { + if let Some(msg) = msg { + match serde_json::from_str::(&msg) { + Ok(msg) => Some((peer_id, Some(msg))), + Err(err) => { + warn!("Failed to parse incoming message: {}", err); + None + } + } + } else { + Some((peer_id, None)) + } + }))); + + let state = Arc::new(Mutex::new(State { + tx: Some(tx), + peers: HashMap::new(), + })); + + let state_clone = state.clone(); + let _ = task::spawn(async move { + while let Some((peer_id, msg)) = handler.next().await { + match serde_json::to_string(&msg) { + Ok(msg) => { + if let Some(peer) = state_clone.lock().unwrap().peers.get_mut(&peer_id) { + let mut sender = peer.sender.clone(); + task::spawn(async move { + let _ = sender.send(msg).await; + }); + } + } + Err(err) => { + warn!("Failed to serialize outgoing message: {}", err); + } + } + } + }); + + Self { state } + } + + #[instrument(level = "debug", skip(state))] + fn remove_peer(state: Arc>, peer_id: &str) { + if let Some(mut peer) = state.lock().unwrap().peers.remove(peer_id) { + let peer_id = peer_id.to_string(); + task::spawn(async move { + peer.sender.close_channel(); + if let Err(err) = peer.send_task_handle.await { + trace!(peer_id = %peer_id, "Error while joining send task: {}", err); + } + peer.receive_task_handle.await; + }); + } + } + + #[instrument(level = "debug", skip(self, stream))] + pub async fn accept_async(&mut self, stream: S) -> Result + where + S: AsyncRead + AsyncWrite + Unpin + Send, + { + let ws = match async_tungstenite::accept_async(stream).await { + Ok(ws) => ws, + Err(err) => { + warn!("Error during the websocket handshake: {}", err); + return Err(ServerError::Handshake(err)); + } + }; + + let this_id = uuid::Uuid::new_v4().to_string(); + info!(this_id = %this_id, "New WebSocket connection"); + + // 1000 is completely arbitrary, we simply don't want infinite piling + // up of messages as with unbounded + let (websocket_sender, mut websocket_receiver) = mpsc::channel::(1000); + + let this_id_clone = this_id.clone(); + let (mut ws_sink, mut ws_stream) = ws.split(); + let send_task_handle = task::spawn(async move { + while let Some(msg) = websocket_receiver.next().await { + trace!(this_id = %this_id_clone, "sending {}", msg); + ws_sink.send(WsMessage::Text(msg)).await?; + } + + ws_sink.send(WsMessage::Close(None)).await?; + ws_sink.close().await?; + + Ok::<(), Error>(()) + }); + + let mut tx = self.state.lock().unwrap().tx.clone(); + let this_id_clone = this_id.clone(); + let state_clone = self.state.clone(); + let receive_task_handle = task::spawn(async move { + while let Some(msg) = ws_stream.next().await { + match msg { + Ok(WsMessage::Text(msg)) => { + if let Some(tx) = tx.as_mut() { + if let Err(err) = tx.send((this_id_clone.clone(), Some(msg))).await { + warn!(this = %this_id_clone, "Error handling message: {:?}", err); + } + } + } + Ok(WsMessage::Close(reason)) => { + info!(this_id = %this_id_clone, "connection closed: {:?}", reason); + break; + } + Ok(_) => warn!(this_id = %this_id_clone, "Unsupported message type"), + Err(err) => { + warn!(this_id = %this_id_clone, "recv error: {}", err); + break; + } + } + } + + if let Some(tx) = tx.as_mut() { + let _ = tx.send((this_id_clone.clone(), None)).await; + } + + Self::remove_peer(state_clone, &this_id_clone); + }); + + self.state.lock().unwrap().peers.insert( + this_id.clone(), + Peer { + receive_task_handle, + send_task_handle, + sender: websocket_sender, + }, + ); + + Ok(this_id) + } +} diff --git a/www/webrtc.js b/www/webrtc.js index 688de720..add5f79b 100644 --- a/www/webrtc.js +++ b/www/webrtc.js @@ -111,7 +111,11 @@ function Session(our_id, peer_id, closed_callback) { var thiz = this; this.peer_connection.setLocalDescription(desc).then(() => { this.setStatus("Sending SDP answer"); - var sdp = {'peer-id': this.peer_id, 'sdp': this.peer_connection.localDescription.toJSON()} + var sdp = { + 'type': 'peer', + 'peerId': this.peer_id, + 'sdp': this.peer_connection.localDescription.toJSON() + }; this.ws_conn.send(JSON.stringify(sdp)); }).catch(function(e) { thiz.setError(e); @@ -146,26 +150,27 @@ function Session(our_id, peer_id, closed_callback) { this.onServerMessage = function(event) { console.log("Received " + event.data); - if (event.data.startsWith("REGISTERED")) { + try { + msg = JSON.parse(event.data); + } catch (e) { + if (e instanceof SyntaxError) { + this.handleIncomingError("Error parsing incoming JSON: " + event.data); + } else { + this.handleIncomingError("Unknown error parsing response: " + event.data); + } + return; + } + + if (msg.type == "registered") { this.setStatus("Registered with server"); this.connectPeer(); - } else if (event.data.startsWith("ERROR")) { - this.handleIncomingError(event.data); - return; - } else { - // Handle incoming JSON SDP and ICE messages - try { - msg = JSON.parse(event.data); - } catch (e) { - if (e instanceof SyntaxError) { - this.handleIncomingError("Error parsing incoming JSON: " + event.data); - } else { - this.handleIncomingError("Unknown error parsing response: " + event.data); - } - return; - } - - // Incoming JSON signals the beginning of a call + } else if (msg.type == "error") { + this.handleIncomingError(msg.details); + } else if (msg.type == "endSession") { + this.resetState(); + this.closed_callback(this.our_id); + } else if (msg.type == "peer") { + // Incoming peer message signals the beginning of a call if (!this.peer_connection) this.createCall(msg); @@ -211,7 +216,10 @@ function Session(our_id, peer_id, closed_callback) { this.ws_conn = new WebSocket(ws_url); /* When connected, immediately register with the server */ this.ws_conn.addEventListener('open', (event) => { - this.ws_conn.send('REGISTER CONSUMER'); + this.ws_conn.send(JSON.stringify({ + "type": "register", + "peerType": "consumer" + })); this.setStatus("Registering with server"); }); this.ws_conn.addEventListener('error', this.onServerError.bind(this)); @@ -222,7 +230,10 @@ function Session(our_id, peer_id, closed_callback) { this.connectPeer = function() { this.setStatus("Connecting " + this.peer_id); - this.ws_conn.send("START_SESSION " + this.peer_id); + this.ws_conn.send(JSON.stringify({ + "type": "startSession", + "peerId": this.peer_id + })); }; this.onRemoteStreamAdded = function(event) { @@ -295,16 +306,16 @@ function Session(our_id, peer_id, closed_callback) { } } - if (!msg.sdp) { - console.log("WARNING: First message wasn't an SDP message!?"); - } - this.peer_connection.onicecandidate = (event) => { if (event.candidate == null) { console.log("ICE Candidate was null, done"); return; } - this.ws_conn.send(JSON.stringify({'ice': event.candidate.toJSON(), 'peer-id': this.peer_id})); + this.ws_conn.send(JSON.stringify({ + "type": "peer", + "peerId": this.peer_id, + "ice": event.candidate.toJSON() + })); }; this.setStatus("Created peer connection for call, waiting for SDP"); @@ -354,23 +365,34 @@ function clearPeers() { function onServerMessage(event) { console.log("Received " + event.data); - if (event.data.startsWith("REGISTERED ")) { - console.log("Listener registered"); - ws_conn.send('LIST'); - } else if (event.data.startsWith('LIST')) { - var split = event.data.split(' '); - clearPeers(); - for (var i = 1; i < split.length; i++) { - addPeer(split[i]); + try { + msg = JSON.parse(event.data); + } catch (e) { + if (e instanceof SyntaxError) { + this.handleIncomingError("Error parsing incoming JSON: " + event.data); + } else { + this.handleIncomingError("Unknown error parsing response: " + event.data); } - } else if (event.data.startsWith('ADD PRODUCER')) { - var split = event.data.split(' '); - addPeer(split[2]); - } else if (event.data.startsWith('REMOVE PRODUCER')) { - var split = event.data.split(' '); - var li = document.getElementById("peer-" + split[2]); + return; + } + + if (msg.type == "registered") { + ws_conn.send(JSON.stringify({ + "type": "list" + })); + } else if (msg.type == "list") { + clearPeers(); + for (i = 0; i < msg.producers.length; i++) { + addPeer(msg.producers[i]); + } + } else if (msg.type == "producerAdded") { + addPeer(msg.peerId); + } else if (msg.type == "producerRemoved") { + var li = document.getElementById("peer-" + msg.peerId); li.parentNode.removeChild(li); + } else { + this.handleIncomingError("Unsupported message: ", msg); } }; @@ -408,7 +430,10 @@ function connect() { console.log("Connecting listener"); ws_conn = new WebSocket(ws_url); ws_conn.addEventListener('open', (event) => { - ws_conn.send('REGISTER LISTENER'); + ws_conn.send(JSON.stringify({ + "type": "register", + "peerType": "listener" + })); }); ws_conn.addEventListener('error', onServerError); ws_conn.addEventListener('message', onServerMessage);