Update for new clone/closure macro syntax

Also fix various weak/strong references in the webrtc plugin, and make
sure to pass the object to debug log functions in every place.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1625>
This commit is contained in:
Sebastian Dröge 2024-06-17 20:01:07 +03:00
parent 90e926def4
commit 47d62b6d78
8 changed files with 956 additions and 854 deletions

179
Cargo.lock generated
View file

@ -981,7 +981,7 @@ dependencies = [
[[package]]
name = "cairo-rs"
version = "0.20.0"
source = "git+https://github.com/gtk-rs/gtk-rs-core?branch=master#d88a7c473e2af36a4718a648406b7894d288ec1b"
source = "git+https://github.com/gtk-rs/gtk-rs-core?branch=master#dbfaebdf8c156f2af843479060e5c6fa08e0ad50"
dependencies = [
"bitflags 2.5.0",
"cairo-sys-rs",
@ -993,11 +993,11 @@ dependencies = [
[[package]]
name = "cairo-sys-rs"
version = "0.20.0"
source = "git+https://github.com/gtk-rs/gtk-rs-core?branch=master#d88a7c473e2af36a4718a648406b7894d288ec1b"
source = "git+https://github.com/gtk-rs/gtk-rs-core?branch=master#dbfaebdf8c156f2af843479060e5c6fa08e0ad50"
dependencies = [
"glib-sys",
"libc",
"system-deps",
"system-deps 7.0.1",
]
[[package]]
@ -1497,7 +1497,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6ecb1c5e8f4dc438eedc1b534a54672fb0e0a56035dae6b50162787bd2c50e95"
dependencies = [
"libc",
"system-deps",
"system-deps 6.2.2",
]
[[package]]
@ -1933,7 +1933,7 @@ dependencies = [
[[package]]
name = "gdk-pixbuf"
version = "0.20.0"
source = "git+https://github.com/gtk-rs/gtk-rs-core?branch=master#d88a7c473e2af36a4718a648406b7894d288ec1b"
source = "git+https://github.com/gtk-rs/gtk-rs-core?branch=master#dbfaebdf8c156f2af843479060e5c6fa08e0ad50"
dependencies = [
"gdk-pixbuf-sys",
"gio",
@ -1944,13 +1944,13 @@ dependencies = [
[[package]]
name = "gdk-pixbuf-sys"
version = "0.20.0"
source = "git+https://github.com/gtk-rs/gtk-rs-core?branch=master#d88a7c473e2af36a4718a648406b7894d288ec1b"
source = "git+https://github.com/gtk-rs/gtk-rs-core?branch=master#dbfaebdf8c156f2af843479060e5c6fa08e0ad50"
dependencies = [
"gio-sys",
"glib-sys",
"gobject-sys",
"libc",
"system-deps",
"system-deps 7.0.1",
]
[[package]]
@ -1980,7 +1980,7 @@ dependencies = [
"libc",
"pango-sys",
"pkg-config",
"system-deps",
"system-deps 6.2.2",
]
[[package]]
@ -2002,7 +2002,7 @@ source = "git+https://github.com/gtk-rs/gtk4-rs?branch=master#f7ff7a0c8f4b3b892e
dependencies = [
"glib-sys",
"libc",
"system-deps",
"system-deps 6.2.2",
]
[[package]]
@ -2026,7 +2026,7 @@ dependencies = [
"gdk4-sys",
"glib-sys",
"libc",
"system-deps",
"system-deps 6.2.2",
]
[[package]]
@ -2049,7 +2049,7 @@ dependencies = [
"gdk4-sys",
"glib-sys",
"libc",
"system-deps",
"system-deps 6.2.2",
]
[[package]]
@ -2103,7 +2103,7 @@ checksum = "40ecd4077b5ae9fd2e9e169b102c6c330d0605168eb0e8bf79952b256dbefffd"
[[package]]
name = "gio"
version = "0.20.0"
source = "git+https://github.com/gtk-rs/gtk-rs-core?branch=master#d88a7c473e2af36a4718a648406b7894d288ec1b"
source = "git+https://github.com/gtk-rs/gtk-rs-core?branch=master#dbfaebdf8c156f2af843479060e5c6fa08e0ad50"
dependencies = [
"futures-channel",
"futures-core",
@ -2120,19 +2120,19 @@ dependencies = [
[[package]]
name = "gio-sys"
version = "0.20.0"
source = "git+https://github.com/gtk-rs/gtk-rs-core?branch=master#d88a7c473e2af36a4718a648406b7894d288ec1b"
source = "git+https://github.com/gtk-rs/gtk-rs-core?branch=master#dbfaebdf8c156f2af843479060e5c6fa08e0ad50"
dependencies = [
"glib-sys",
"gobject-sys",
"libc",
"system-deps",
"system-deps 7.0.1",
"windows-sys 0.52.0",
]
[[package]]
name = "glib"
version = "0.20.0"
source = "git+https://github.com/gtk-rs/gtk-rs-core?branch=master#d88a7c473e2af36a4718a648406b7894d288ec1b"
source = "git+https://github.com/gtk-rs/gtk-rs-core?branch=master#dbfaebdf8c156f2af843479060e5c6fa08e0ad50"
dependencies = [
"bitflags 2.5.0",
"futures-channel",
@ -2153,7 +2153,7 @@ dependencies = [
[[package]]
name = "glib-macros"
version = "0.20.0"
source = "git+https://github.com/gtk-rs/gtk-rs-core?branch=master#d88a7c473e2af36a4718a648406b7894d288ec1b"
source = "git+https://github.com/gtk-rs/gtk-rs-core?branch=master#dbfaebdf8c156f2af843479060e5c6fa08e0ad50"
dependencies = [
"heck 0.5.0",
"proc-macro-crate",
@ -2165,10 +2165,10 @@ dependencies = [
[[package]]
name = "glib-sys"
version = "0.20.0"
source = "git+https://github.com/gtk-rs/gtk-rs-core?branch=master#d88a7c473e2af36a4718a648406b7894d288ec1b"
source = "git+https://github.com/gtk-rs/gtk-rs-core?branch=master#dbfaebdf8c156f2af843479060e5c6fa08e0ad50"
dependencies = [
"libc",
"system-deps",
"system-deps 7.0.1",
]
[[package]]
@ -2180,17 +2180,17 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
[[package]]
name = "gobject-sys"
version = "0.20.0"
source = "git+https://github.com/gtk-rs/gtk-rs-core?branch=master#d88a7c473e2af36a4718a648406b7894d288ec1b"
source = "git+https://github.com/gtk-rs/gtk-rs-core?branch=master#dbfaebdf8c156f2af843479060e5c6fa08e0ad50"
dependencies = [
"glib-sys",
"libc",
"system-deps",
"system-deps 7.0.1",
]
[[package]]
name = "graphene-rs"
version = "0.20.0"
source = "git+https://github.com/gtk-rs/gtk-rs-core?branch=master#d88a7c473e2af36a4718a648406b7894d288ec1b"
source = "git+https://github.com/gtk-rs/gtk-rs-core?branch=master#dbfaebdf8c156f2af843479060e5c6fa08e0ad50"
dependencies = [
"glib",
"graphene-sys",
@ -2200,12 +2200,12 @@ dependencies = [
[[package]]
name = "graphene-sys"
version = "0.20.0"
source = "git+https://github.com/gtk-rs/gtk-rs-core?branch=master#d88a7c473e2af36a4718a648406b7894d288ec1b"
source = "git+https://github.com/gtk-rs/gtk-rs-core?branch=master#dbfaebdf8c156f2af843479060e5c6fa08e0ad50"
dependencies = [
"glib-sys",
"libc",
"pkg-config",
"system-deps",
"system-deps 7.0.1",
]
[[package]]
@ -2245,7 +2245,7 @@ dependencies = [
"graphene-sys",
"libc",
"pango-sys",
"system-deps",
"system-deps 6.2.2",
]
[[package]]
@ -3122,7 +3122,7 @@ dependencies = [
[[package]]
name = "gstreamer"
version = "0.23.0"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#0be8b364f8ea871ca5bd1f4d2ad6fa5cee5e437a"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#cfb0fe6a17254eef20abe2584f975ab9f4dc26a4"
dependencies = [
"cfg-if",
"futures-channel",
@ -3148,7 +3148,7 @@ dependencies = [
[[package]]
name = "gstreamer-allocators"
version = "0.23.0"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#0be8b364f8ea871ca5bd1f4d2ad6fa5cee5e437a"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#cfb0fe6a17254eef20abe2584f975ab9f4dc26a4"
dependencies = [
"glib",
"gstreamer",
@ -3160,19 +3160,19 @@ dependencies = [
[[package]]
name = "gstreamer-allocators-sys"
version = "0.23.0"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#0be8b364f8ea871ca5bd1f4d2ad6fa5cee5e437a"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#cfb0fe6a17254eef20abe2584f975ab9f4dc26a4"
dependencies = [
"glib-sys",
"gobject-sys",
"gstreamer-sys",
"libc",
"system-deps",
"system-deps 7.0.1",
]
[[package]]
name = "gstreamer-app"
version = "0.23.0"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#0be8b364f8ea871ca5bd1f4d2ad6fa5cee5e437a"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#cfb0fe6a17254eef20abe2584f975ab9f4dc26a4"
dependencies = [
"futures-core",
"futures-sink",
@ -3186,19 +3186,19 @@ dependencies = [
[[package]]
name = "gstreamer-app-sys"
version = "0.23.0"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#0be8b364f8ea871ca5bd1f4d2ad6fa5cee5e437a"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#cfb0fe6a17254eef20abe2584f975ab9f4dc26a4"
dependencies = [
"glib-sys",
"gstreamer-base-sys",
"gstreamer-sys",
"libc",
"system-deps",
"system-deps 7.0.1",
]
[[package]]
name = "gstreamer-audio"
version = "0.23.0"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#0be8b364f8ea871ca5bd1f4d2ad6fa5cee5e437a"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#cfb0fe6a17254eef20abe2584f975ab9f4dc26a4"
dependencies = [
"cfg-if",
"glib",
@ -3214,20 +3214,20 @@ dependencies = [
[[package]]
name = "gstreamer-audio-sys"
version = "0.23.0"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#0be8b364f8ea871ca5bd1f4d2ad6fa5cee5e437a"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#cfb0fe6a17254eef20abe2584f975ab9f4dc26a4"
dependencies = [
"glib-sys",
"gobject-sys",
"gstreamer-base-sys",
"gstreamer-sys",
"libc",
"system-deps",
"system-deps 7.0.1",
]
[[package]]
name = "gstreamer-base"
version = "0.23.0"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#0be8b364f8ea871ca5bd1f4d2ad6fa5cee5e437a"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#cfb0fe6a17254eef20abe2584f975ab9f4dc26a4"
dependencies = [
"atomic_refcell",
"cfg-if",
@ -3240,19 +3240,19 @@ dependencies = [
[[package]]
name = "gstreamer-base-sys"
version = "0.23.0"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#0be8b364f8ea871ca5bd1f4d2ad6fa5cee5e437a"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#cfb0fe6a17254eef20abe2584f975ab9f4dc26a4"
dependencies = [
"glib-sys",
"gobject-sys",
"gstreamer-sys",
"libc",
"system-deps",
"system-deps 7.0.1",
]
[[package]]
name = "gstreamer-check"
version = "0.23.0"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#0be8b364f8ea871ca5bd1f4d2ad6fa5cee5e437a"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#cfb0fe6a17254eef20abe2584f975ab9f4dc26a4"
dependencies = [
"glib",
"gstreamer",
@ -3262,19 +3262,19 @@ dependencies = [
[[package]]
name = "gstreamer-check-sys"
version = "0.23.0"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#0be8b364f8ea871ca5bd1f4d2ad6fa5cee5e437a"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#cfb0fe6a17254eef20abe2584f975ab9f4dc26a4"
dependencies = [
"glib-sys",
"gobject-sys",
"gstreamer-sys",
"libc",
"system-deps",
"system-deps 7.0.1",
]
[[package]]
name = "gstreamer-gl"
version = "0.23.0"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#0be8b364f8ea871ca5bd1f4d2ad6fa5cee5e437a"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#cfb0fe6a17254eef20abe2584f975ab9f4dc26a4"
dependencies = [
"glib",
"gstreamer",
@ -3288,7 +3288,7 @@ dependencies = [
[[package]]
name = "gstreamer-gl-egl"
version = "0.23.0"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#0be8b364f8ea871ca5bd1f4d2ad6fa5cee5e437a"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#cfb0fe6a17254eef20abe2584f975ab9f4dc26a4"
dependencies = [
"glib",
"gstreamer",
@ -3300,18 +3300,18 @@ dependencies = [
[[package]]
name = "gstreamer-gl-egl-sys"
version = "0.23.0"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#0be8b364f8ea871ca5bd1f4d2ad6fa5cee5e437a"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#cfb0fe6a17254eef20abe2584f975ab9f4dc26a4"
dependencies = [
"glib-sys",
"gstreamer-gl-sys",
"libc",
"system-deps",
"system-deps 7.0.1",
]
[[package]]
name = "gstreamer-gl-sys"
version = "0.23.0"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#0be8b364f8ea871ca5bd1f4d2ad6fa5cee5e437a"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#cfb0fe6a17254eef20abe2584f975ab9f4dc26a4"
dependencies = [
"glib-sys",
"gobject-sys",
@ -3319,13 +3319,13 @@ dependencies = [
"gstreamer-sys",
"gstreamer-video-sys",
"libc",
"system-deps",
"system-deps 7.0.1",
]
[[package]]
name = "gstreamer-gl-wayland"
version = "0.23.0"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#0be8b364f8ea871ca5bd1f4d2ad6fa5cee5e437a"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#cfb0fe6a17254eef20abe2584f975ab9f4dc26a4"
dependencies = [
"glib",
"gstreamer",
@ -3337,18 +3337,18 @@ dependencies = [
[[package]]
name = "gstreamer-gl-wayland-sys"
version = "0.23.0"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#0be8b364f8ea871ca5bd1f4d2ad6fa5cee5e437a"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#cfb0fe6a17254eef20abe2584f975ab9f4dc26a4"
dependencies = [
"glib-sys",
"gstreamer-gl-sys",
"libc",
"system-deps",
"system-deps 7.0.1",
]
[[package]]
name = "gstreamer-gl-x11"
version = "0.23.0"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#0be8b364f8ea871ca5bd1f4d2ad6fa5cee5e437a"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#cfb0fe6a17254eef20abe2584f975ab9f4dc26a4"
dependencies = [
"glib",
"gstreamer",
@ -3360,18 +3360,18 @@ dependencies = [
[[package]]
name = "gstreamer-gl-x11-sys"
version = "0.23.0"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#0be8b364f8ea871ca5bd1f4d2ad6fa5cee5e437a"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#cfb0fe6a17254eef20abe2584f975ab9f4dc26a4"
dependencies = [
"glib-sys",
"gstreamer-gl-sys",
"libc",
"system-deps",
"system-deps 7.0.1",
]
[[package]]
name = "gstreamer-net"
version = "0.23.0"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#0be8b364f8ea871ca5bd1f4d2ad6fa5cee5e437a"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#cfb0fe6a17254eef20abe2584f975ab9f4dc26a4"
dependencies = [
"gio",
"glib",
@ -3382,19 +3382,19 @@ dependencies = [
[[package]]
name = "gstreamer-net-sys"
version = "0.23.0"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#0be8b364f8ea871ca5bd1f4d2ad6fa5cee5e437a"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#cfb0fe6a17254eef20abe2584f975ab9f4dc26a4"
dependencies = [
"gio-sys",
"glib-sys",
"gstreamer-sys",
"libc",
"system-deps",
"system-deps 7.0.1",
]
[[package]]
name = "gstreamer-pbutils"
version = "0.23.0"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#0be8b364f8ea871ca5bd1f4d2ad6fa5cee5e437a"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#cfb0fe6a17254eef20abe2584f975ab9f4dc26a4"
dependencies = [
"glib",
"gstreamer",
@ -3408,7 +3408,7 @@ dependencies = [
[[package]]
name = "gstreamer-pbutils-sys"
version = "0.23.0"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#0be8b364f8ea871ca5bd1f4d2ad6fa5cee5e437a"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#cfb0fe6a17254eef20abe2584f975ab9f4dc26a4"
dependencies = [
"glib-sys",
"gobject-sys",
@ -3416,13 +3416,13 @@ dependencies = [
"gstreamer-sys",
"gstreamer-video-sys",
"libc",
"system-deps",
"system-deps 7.0.1",
]
[[package]]
name = "gstreamer-rtp"
version = "0.23.0"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#0be8b364f8ea871ca5bd1f4d2ad6fa5cee5e437a"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#cfb0fe6a17254eef20abe2584f975ab9f4dc26a4"
dependencies = [
"glib",
"gstreamer",
@ -3433,19 +3433,19 @@ dependencies = [
[[package]]
name = "gstreamer-rtp-sys"
version = "0.23.0"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#0be8b364f8ea871ca5bd1f4d2ad6fa5cee5e437a"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#cfb0fe6a17254eef20abe2584f975ab9f4dc26a4"
dependencies = [
"glib-sys",
"gstreamer-base-sys",
"gstreamer-sys",
"libc",
"system-deps",
"system-deps 7.0.1",
]
[[package]]
name = "gstreamer-sdp"
version = "0.23.0"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#0be8b364f8ea871ca5bd1f4d2ad6fa5cee5e437a"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#cfb0fe6a17254eef20abe2584f975ab9f4dc26a4"
dependencies = [
"glib",
"gstreamer",
@ -3455,29 +3455,29 @@ dependencies = [
[[package]]
name = "gstreamer-sdp-sys"
version = "0.23.0"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#0be8b364f8ea871ca5bd1f4d2ad6fa5cee5e437a"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#cfb0fe6a17254eef20abe2584f975ab9f4dc26a4"
dependencies = [
"glib-sys",
"gstreamer-sys",
"libc",
"system-deps",
"system-deps 7.0.1",
]
[[package]]
name = "gstreamer-sys"
version = "0.23.0"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#0be8b364f8ea871ca5bd1f4d2ad6fa5cee5e437a"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#cfb0fe6a17254eef20abe2584f975ab9f4dc26a4"
dependencies = [
"glib-sys",
"gobject-sys",
"libc",
"system-deps",
"system-deps 7.0.1",
]
[[package]]
name = "gstreamer-utils"
version = "0.23.0"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#0be8b364f8ea871ca5bd1f4d2ad6fa5cee5e437a"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#cfb0fe6a17254eef20abe2584f975ab9f4dc26a4"
dependencies = [
"gstreamer",
"gstreamer-app",
@ -3489,7 +3489,7 @@ dependencies = [
[[package]]
name = "gstreamer-video"
version = "0.23.0"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#0be8b364f8ea871ca5bd1f4d2ad6fa5cee5e437a"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#cfb0fe6a17254eef20abe2584f975ab9f4dc26a4"
dependencies = [
"cfg-if",
"futures-channel",
@ -3506,20 +3506,20 @@ dependencies = [
[[package]]
name = "gstreamer-video-sys"
version = "0.23.0"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#0be8b364f8ea871ca5bd1f4d2ad6fa5cee5e437a"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#cfb0fe6a17254eef20abe2584f975ab9f4dc26a4"
dependencies = [
"glib-sys",
"gobject-sys",
"gstreamer-base-sys",
"gstreamer-sys",
"libc",
"system-deps",
"system-deps 7.0.1",
]
[[package]]
name = "gstreamer-webrtc"
version = "0.23.0"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#0be8b364f8ea871ca5bd1f4d2ad6fa5cee5e437a"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#cfb0fe6a17254eef20abe2584f975ab9f4dc26a4"
dependencies = [
"glib",
"gstreamer",
@ -3531,13 +3531,13 @@ dependencies = [
[[package]]
name = "gstreamer-webrtc-sys"
version = "0.23.0"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#0be8b364f8ea871ca5bd1f4d2ad6fa5cee5e437a"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#cfb0fe6a17254eef20abe2584f975ab9f4dc26a4"
dependencies = [
"glib-sys",
"gstreamer-sdp-sys",
"gstreamer-sys",
"libc",
"system-deps",
"system-deps 7.0.1",
]
[[package]]
@ -3586,7 +3586,7 @@ dependencies = [
"gsk4-sys",
"libc",
"pango-sys",
"system-deps",
"system-deps 6.2.2",
]
[[package]]
@ -4397,7 +4397,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c2a198fb6b0eada2a8df47933734e6d35d350665a33a3593d7164fa52c75c19"
dependencies = [
"cfg-if",
"windows-targets 0.52.5",
"windows-targets 0.48.5",
]
[[package]]
@ -5149,7 +5149,7 @@ dependencies = [
[[package]]
name = "pango"
version = "0.20.0"
source = "git+https://github.com/gtk-rs/gtk-rs-core?branch=master#d88a7c473e2af36a4718a648406b7894d288ec1b"
source = "git+https://github.com/gtk-rs/gtk-rs-core?branch=master#dbfaebdf8c156f2af843479060e5c6fa08e0ad50"
dependencies = [
"gio",
"glib",
@ -5160,18 +5160,18 @@ dependencies = [
[[package]]
name = "pango-sys"
version = "0.20.0"
source = "git+https://github.com/gtk-rs/gtk-rs-core?branch=master#d88a7c473e2af36a4718a648406b7894d288ec1b"
source = "git+https://github.com/gtk-rs/gtk-rs-core?branch=master#dbfaebdf8c156f2af843479060e5c6fa08e0ad50"
dependencies = [
"glib-sys",
"gobject-sys",
"libc",
"system-deps",
"system-deps 7.0.1",
]
[[package]]
name = "pangocairo"
version = "0.20.0"
source = "git+https://github.com/gtk-rs/gtk-rs-core?branch=master#d88a7c473e2af36a4718a648406b7894d288ec1b"
source = "git+https://github.com/gtk-rs/gtk-rs-core?branch=master#dbfaebdf8c156f2af843479060e5c6fa08e0ad50"
dependencies = [
"cairo-rs",
"glib",
@ -5183,13 +5183,13 @@ dependencies = [
[[package]]
name = "pangocairo-sys"
version = "0.20.0"
source = "git+https://github.com/gtk-rs/gtk-rs-core?branch=master#d88a7c473e2af36a4718a648406b7894d288ec1b"
source = "git+https://github.com/gtk-rs/gtk-rs-core?branch=master#dbfaebdf8c156f2af843479060e5c6fa08e0ad50"
dependencies = [
"cairo-sys-rs",
"glib-sys",
"libc",
"pango-sys",
"system-deps",
"system-deps 7.0.1",
]
[[package]]
@ -5526,7 +5526,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "22505a5c94da8e3b7c2996394d1c933236c4d743e81a410bcca4e6989fc066a4"
dependencies = [
"bytes",
"heck 0.5.0",
"heck 0.4.1",
"itertools 0.12.1",
"log",
"multimap 0.10.0",
@ -5747,7 +5747,7 @@ dependencies = [
"rand",
"rand_chacha",
"simd_helpers",
"system-deps",
"system-deps 6.2.2",
"thiserror",
"v_frame",
]
@ -6741,6 +6741,19 @@ dependencies = [
"version-compare",
]
[[package]]
name = "system-deps"
version = "7.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c81f13d9a334a6c242465140bd262fae382b752ff2011c4f7419919a9c97922"
dependencies = [
"cfg-expr",
"heck 0.5.0",
"pkg-config",
"toml",
"version-compare",
]
[[package]]
name = "target-lexicon"
version = "0.12.14"

View file

@ -84,13 +84,16 @@ fn main() {
.property("signal-handoffs", true)
.build()
.unwrap();
sink.connect(
sink.connect_closure(
"handoff",
true,
glib::clone!(@strong counter => move |_| {
let _ = counter.fetch_add(1, Ordering::SeqCst);
None
}),
glib::closure!(
#[strong]
counter,
move |_fakesink: &gst::Element, _buffer: &gst::Buffer, _pad: &gst::Pad| {
let _ = counter.fetch_add(1, Ordering::SeqCst);
}
),
);
let (source, context) = match source.as_str() {

View file

@ -334,8 +334,10 @@ impl Signaller {
// 1000 is completely arbitrary, we simply don't want infinite piling
// up of messages as with unbounded
let (ws_sender, mut ws_receiver) = mpsc::channel::<OutgoingMessage>(1000);
let send_task_handle =
RUNTIME.spawn(glib::clone!(@weak-allow-none self as this => async move {
let send_task_handle = RUNTIME.spawn(glib::clone!(
#[to_owned(rename_to = this)]
self,
async move {
let mut res = Ok(());
loop {
tokio::select! {
@ -349,7 +351,6 @@ impl Signaller {
None => break,
},
_ = tokio::time::sleep(Duration::from_secs(10)) => {
if let Some(ref this) = this {
let (transaction, session_id, apisecret) = {
let state = this.state.lock().unwrap();
let settings = this.settings.lock().unwrap();
@ -368,45 +369,37 @@ impl Signaller {
res = ws_sink
.send(WsMessage::Text(serde_json::to_string(&msg).unwrap()))
.await;
}
}
}
if let Err(ref err) = res {
this.as_ref().map_or_else(|| gst::error!(CAT, "Quitting send task: {err}"),
|this| gst::error!(CAT, imp: this, "Quitting send task: {err}")
);
gst::error!(CAT, imp: this, "Quitting send task: {err}");
break;
}
}
this.map_or_else(|| gst::debug!(CAT, "Done sending"),
|this| gst::debug!(CAT, imp: this, "Done sending")
);
gst::debug!(CAT, imp: this, "Done sending");
let _ = ws_sink.close().await;
res.map_err(Into::into)
}));
}
));
let recv_task_handle =
RUNTIME.spawn(glib::clone!(@weak-allow-none self as this => async move {
let recv_task_handle = RUNTIME.spawn(glib::clone!(
#[to_owned(rename_to = this)]
self,
async move {
while let Some(msg) = tokio_stream::StreamExt::next(&mut ws_stream).await {
if let Some(ref this) = this {
if let ControlFlow::Break(_) = this.handle_msg(msg) {
break;
}
} else {
if let ControlFlow::Break(_) = this.handle_msg(msg) {
break;
}
}
let msg = "Stopped websocket receiving";
this.map_or_else(|| gst::info!(CAT, "{msg}"),
|this| gst::info!(CAT, imp: this, "{msg}")
);
}));
gst::info!(CAT, imp: this, "{msg}");
}
));
let mut state = self.state.lock().unwrap();
state.ws_sender = Some(ws_sender);
@ -531,11 +524,15 @@ impl Signaller {
fn send(&self, msg: OutgoingMessage) {
let state = self.state.lock().unwrap();
if let Some(mut sender) = state.ws_sender.clone() {
RUNTIME.spawn(glib::clone!(@weak self as this => async move {
if let Err(err) = sender.send(msg).await {
this.raise_error(err.to_string());
RUNTIME.spawn(glib::clone!(
#[to_owned(rename_to = this)]
self,
async move {
if let Err(err) = sender.send(msg).await {
this.raise_error(err.to_string());
}
}
}));
));
}
}
@ -545,11 +542,11 @@ impl Signaller {
fn send_blocking(&self, msg: OutgoingMessage) {
let state = self.state.lock().unwrap();
if let Some(mut sender) = state.ws_sender.clone() {
RUNTIME.block_on(glib::clone!(@weak self as this => async move {
RUNTIME.block_on(async {
if let Err(err) = sender.send(msg).await {
this.raise_error(err.to_string());
self.raise_error(err.to_string());
}
}));
});
}
}

View file

@ -589,9 +589,7 @@ impl SignallableImpl for Signaller {
let weak_imp = self.downgrade();
RUNTIME.spawn(async move {
let imp = if let Some(imp) = weak_imp.upgrade() {
imp
} else {
let Some(imp) = weak_imp.upgrade() else {
return;
};
@ -634,35 +632,39 @@ impl SignallableImpl for Signaller {
}
}
let weak_imp = imp.downgrade();
imp.obj().connect_closure(
"webrtcbin-ready",
false,
glib::closure!(|_signaller: &super::LiveKitSignaller,
_consumer_identifier: &str,
webrtcbin: &gst::Element| {
gst::info!(CAT, "Adding data channels");
let reliable_channel = webrtcbin.emit_by_name::<gst_webrtc::WebRTCDataChannel>(
"create-data-channel",
&[
&"_reliable",
&gst::Structure::builder("config")
.field("ordered", true)
.build(),
],
);
let lossy_channel = webrtcbin.emit_by_name::<gst_webrtc::WebRTCDataChannel>(
"create-data-channel",
&[
&"_lossy",
&gst::Structure::builder("config")
.field("ordered", true)
.field("max-retransmits", 0)
.build(),
],
);
glib::closure!(
#[watch(rename_to = obj)]
imp.obj(),
move |_signaller: &super::LiveKitSignaller,
_consumer_identifier: &str,
webrtcbin: &gst::Element| {
let imp = obj.imp();
gst::info!(CAT, "Adding data channels");
let reliable_channel = webrtcbin
.emit_by_name::<gst_webrtc::WebRTCDataChannel>(
"create-data-channel",
&[
&"_reliable",
&gst::Structure::builder("config")
.field("ordered", true)
.build(),
],
);
let lossy_channel = webrtcbin
.emit_by_name::<gst_webrtc::WebRTCDataChannel>(
"create-data-channel",
&[
&"_lossy",
&gst::Structure::builder("config")
.field("ordered", true)
.field("max-retransmits", 0)
.build(),
],
);
if let Some(imp) = weak_imp.upgrade() {
let mut connection = imp.connection.lock().unwrap();
if let Some(connection) = connection.as_mut() {
connection.channels = Some(Channels {
@ -671,7 +673,7 @@ impl SignallableImpl for Signaller {
});
}
}
}),
),
);
let connection = Connection {

View file

@ -68,6 +68,7 @@ pub struct Signaller {
struct State {
/// Sender for the websocket messages
websocket_sender: Option<mpsc::Sender<p::IncomingMessage>>,
connect_task_handle: Option<task::JoinHandle<()>>,
send_task_handle: Option<task::JoinHandle<Result<(), Error>>>,
receive_task_handle: Option<task::JoinHandle<()>>,
producers: HashSet<String>,
@ -173,8 +174,10 @@ impl Signaller {
// 1000 is completely arbitrary, we simply don't want infinite piling
// up of messages as with unbounded
let (websocket_sender, mut websocket_receiver) = mpsc::channel::<p::IncomingMessage>(1000);
let send_task_handle =
RUNTIME.spawn(glib::clone!(@weak-allow-none self as this => async move {
let send_task_handle = RUNTIME.spawn(glib::clone!(
#[to_owned(rename_to = this)]
self,
async move {
let mut res = Ok(());
while let Some(msg) = websocket_receiver.next().await {
gst::log!(CAT, "Sending websocket message {:?}", msg);
@ -183,21 +186,18 @@ impl Signaller {
.await;
if let Err(ref err) = res {
this.as_ref().map_or_else(|| gst::error!(CAT, "Quitting send loop: {err}"),
|this| gst::error!(CAT, imp: this, "Quitting send loop: {err}")
);
gst::error!(CAT, imp: this, "Quitting send loop: {err}");
break;
}
}
this.map_or_else(|| gst::debug!(CAT, "Done sending"),
|this| gst::debug!(CAT, imp: this, "Done sending")
);
gst::debug!(CAT, imp: this, "Done sending");
let _ = ws_sink.close().await;
res.map_err(Into::into)
}));
}
));
let obj = self.obj();
let meta =
@ -207,23 +207,20 @@ impl Signaller {
None
};
let receive_task_handle =
RUNTIME.spawn(glib::clone!(@weak-allow-none self as this => async move {
let receive_task_handle = RUNTIME.spawn(glib::clone!(
#[to_owned(rename_to = this)]
self,
async move {
while let Some(msg) = tokio_stream::StreamExt::next(&mut ws_stream).await {
if let Some(ref this) = this {
if let ControlFlow::Break(_) = this.handle_message(msg, &meta) {
break;
}
} else {
if let ControlFlow::Break(_) = this.handle_message(msg, &meta) {
break;
}
}
let msg = "Stopped websocket receiving";
this.map_or_else(|| gst::info!(CAT, "{msg}"),
|this| gst::info!(CAT, imp: this, "{msg}")
);
}));
gst::info!(CAT, imp: this, "{msg}");
}
));
let mut state = self.state.lock().unwrap();
state.websocket_sender = Some(websocket_sender);
@ -297,11 +294,16 @@ impl Signaller {
fn send(&self, msg: p::IncomingMessage) {
let state = self.state.lock().unwrap();
if let Some(mut sender) = state.websocket_sender.clone() {
RUNTIME.spawn(glib::clone!(@weak self as this => async move {
if let Err(err) = sender.send(msg).await {
this.obj().emit_by_name::<()>("error", &[&format!("Error: {}", err)]);
RUNTIME.spawn(glib::clone!(
#[to_owned(rename_to = this)]
self,
async move {
if let Err(err) = sender.send(msg).await {
this.obj()
.emit_by_name::<()>("error", &[&format!("Error: {}", err)]);
}
}
}));
));
}
}
@ -636,17 +638,37 @@ impl ObjectImpl for Signaller {
impl SignallableImpl for Signaller {
fn start(&self) {
gst::info!(CAT, imp: self, "Starting");
RUNTIME.spawn(glib::clone!(@weak self as this => async move {
if let Err(err) = this.connect().await {
this.obj().emit_by_name::<()>("error", &[&format!("Error receiving: {}", err)]);
let mut state = self.state.lock().unwrap();
let connect_task_handle = RUNTIME.spawn(glib::clone!(
#[to_owned(rename_to = this)]
self,
async move {
if let Err(err) = this.connect().await {
this.obj()
.emit_by_name::<()>("error", &[&format!("Error receiving: {}", err)]);
}
}
}));
));
state.connect_task_handle = Some(connect_task_handle);
}
fn stop(&self) {
gst::info!(CAT, imp: self, "Stopping now");
let mut state = self.state.lock().unwrap();
// First make sure the connect task is stopped if it is still
// running
let connect_task_handle = state.connect_task_handle.take();
if let Some(handle) = connect_task_handle {
RUNTIME.block_on(async move {
handle.abort();
let _ = handle.await;
});
}
let send_task_handle = state.send_task_handle.take();
let receive_task_handle = state.receive_task_handle.take();
if let Some(mut sender) = state.websocket_sender.take() {
@ -661,6 +683,7 @@ impl SignallableImpl for Signaller {
if let Some(handle) = receive_task_handle {
handle.abort();
let _ = handle.await;
}
});
}
@ -720,16 +743,21 @@ impl SignallableImpl for Signaller {
let state = self.state.lock().unwrap();
let session_id = session_id.to_string();
if let Some(mut sender) = state.websocket_sender.clone() {
RUNTIME.spawn(glib::clone!(@weak self as this => async move {
if let Err(err) = sender
.send(p::IncomingMessage::EndSession(p::EndSessionMessage {
session_id,
}))
.await
{
this.obj().emit_by_name::<()>("error", &[&format!("Error: {}", err)]);
RUNTIME.spawn(glib::clone!(
#[to_owned(rename_to = this)]
self,
async move {
if let Err(err) = sender
.send(p::IncomingMessage::EndSession(p::EndSessionMessage {
session_id,
}))
.await
{
this.obj()
.emit_by_name::<()>("error", &[&format!("Error: {}", err)]);
}
}
}));
));
}
}
}

File diff suppressed because it is too large Load diff

View file

@ -406,8 +406,13 @@ impl Session {
}
let ghostpad = gst::GhostPad::builder(gst::PadDirection::Src)
.proxy_pad_chain_function(glib::clone!(@weak element, @strong self.id as sess_id => @default-panic, move
|pad, parent, buffer| {
.proxy_pad_chain_function(glib::clone!(
#[weak]
element,
#[strong(rename_to = sess_id)]
self.id,
#[upgrade_or_panic]
move |pad, parent, buffer| {
let padret = gst::ProxyPad::chain_default(pad, parent, buffer);
let state = element.imp().state.lock().unwrap();
let Some(session) = state.sessions.get(&sess_id) else {
@ -418,30 +423,44 @@ impl Session {
f
}
))
.proxy_pad_event_function(glib::clone!(@weak element , @weak webrtcbin_pad as webrtcpad, @strong self.id as sess_id => @default-panic, move |pad, parent, event| {
let event = if let gst::EventView::StreamStart(stream_start) = event.view() {
let state = element.imp().state.lock().unwrap();
if let Some(session) = state.sessions.get(&sess_id) {
session.get_src_pad_from_webrtcbin_pad(&webrtcpad, &element)
.map(|srcpad| {
gst::event::StreamStart::builder(&srcpad.imp().stream_id())
.seqnum(stream_start.seqnum())
.group_id(stream_start.group_id().unwrap_or_else(gst::GroupId::next))
.build()
}).unwrap_or(event)
.proxy_pad_event_function(glib::clone!(
#[weak]
element,
#[weak(rename_to = webrtcpad)]
webrtcbin_pad,
#[strong(rename_to = sess_id)]
self.id,
#[upgrade_or_panic]
move |pad, parent, event| {
let event = if let gst::EventView::StreamStart(stream_start) = event.view() {
let state = element.imp().state.lock().unwrap();
if let Some(session) = state.sessions.get(&sess_id) {
session
.get_src_pad_from_webrtcbin_pad(&webrtcpad, &element)
.map(|srcpad| {
gst::event::StreamStart::builder(&srcpad.imp().stream_id())
.seqnum(stream_start.seqnum())
.group_id(
stream_start
.group_id()
.unwrap_or_else(gst::GroupId::next),
)
.build()
})
.unwrap_or(event)
} else {
gst::error!(CAT, obj: element , "session {sess_id:?} does not exist");
event
}
} else {
gst::error!(CAT, obj: element , "session {sess_id:?} does not exist");
event
}
} else {
event
};
};
gst::Pad::event_default(pad, parent, event)
}))
gst::Pad::event_default(pad, parent, event)
}
))
.build();
let sess_id = self.id.clone();
if element
.imp()
.settings
@ -451,23 +470,34 @@ impl Session {
{
webrtcbin_pad.add_probe(
gst::PadProbeType::EVENT_UPSTREAM,
glib::clone!(@weak element => @default-panic, move |_pad, info| {
let Some(ev) = info.event() else {
return gst::PadProbeReturn::Ok;
};
if ev.type_() != gst::EventType::Navigation {
return gst::PadProbeReturn::Ok;
};
glib::clone!(
#[weak]
element,
#[strong(rename_to = sess_id)]
self.id,
#[upgrade_or]
gst::PadProbeReturn::Remove,
move |_pad, info| {
let Some(ev) = info.event() else {
return gst::PadProbeReturn::Ok;
};
if ev.type_() != gst::EventType::Navigation {
return gst::PadProbeReturn::Ok;
};
let mut state = element.imp().state.lock().unwrap();
if let Some(session) = state.sessions.get_mut(&sess_id) {
session.send_navigation_event(gst_video::NavigationEvent::parse(ev).unwrap(), &element);
} else {
gst::error!(CAT, obj: element , "session {sess_id:?} does not exist");
let mut state = element.imp().state.lock().unwrap();
if let Some(session) = state.sessions.get_mut(&sess_id) {
session.send_navigation_event(
gst_video::NavigationEvent::parse(ev).unwrap(),
&element,
);
} else {
gst::error!(CAT, obj: element , "session {sess_id:?} does not exist");
}
gst::PadProbeReturn::Ok
}
gst::PadProbeReturn::Ok
}),
),
);
}
@ -502,15 +532,17 @@ impl Session {
.expect("decodebin3 needs to be present!");
bin.add(&decodebin).unwrap();
decodebin.sync_state_with_parent().unwrap();
decodebin.connect_pad_added(
glib::clone!(@weak element as this, @weak ghostpad as ghostpad => move |_webrtcbin, pad| {
decodebin.connect_pad_added(glib::clone!(
#[weak]
ghostpad,
move |_webrtcbin, pad| {
if pad.direction() == gst::PadDirection::Sink {
return;
}
ghostpad.set_target(Some(pad)).unwrap();
}),
);
}
));
gst::debug!(CAT, obj: element, "Decoding for {}", srcpad.imp().stream_id());
@ -684,20 +716,21 @@ impl Session {
gst::info!(CAT, obj: element, "Set remote description");
let obj = element.clone();
let session_id = self.id.clone();
let promise =
gst::Promise::with_change_func(glib::clone!(@weak element as ele => move |reply| {
let state = ele.imp().state.lock().unwrap();
gst::info!(CAT, obj: ele, "got answer for session {session_id:?}");
let Some(session) = state.sessions.get(&session_id) else {
gst::error!(CAT, obj: ele , "no session {session_id:?}");
return
};
session.on_answer_created(reply, &obj);
}
));
let promise = gst::Promise::with_change_func(glib::clone!(
#[weak]
element,
#[strong(rename_to = session_id)]
self.id,
move |reply| {
let state = element.imp().state.lock().unwrap();
gst::info!(CAT, obj: element, "got answer for session {session_id:?}");
let Some(session) = state.sessions.get(&session_id) else {
gst::error!(CAT, obj: element , "no session {session_id:?}");
return;
};
session.on_answer_created(reply, &element);
}
));
// We cannot emit `create-answer` from here. The promise function
// of the answer needs the state lock which is held by the caller
@ -843,7 +876,7 @@ impl BaseWebRTCSrc {
error: signaller.connect_closure(
"error",
false,
glib::closure!(@watch instance => move |
glib::closure!(#[watch] instance, move |
_signaller: glib::Object, error: String| {
gst::element_error!(
instance,
@ -856,7 +889,7 @@ impl BaseWebRTCSrc {
session_started: signaller.connect_closure(
"session-started",
false,
glib::closure!(@watch instance => move |
glib::closure!(#[watch] instance, move |
_signaller: glib::Object,
session_id: &str,
_peer_id: &str| {
@ -869,7 +902,7 @@ impl BaseWebRTCSrc {
session_ended: signaller.connect_closure(
"session-ended",
false,
glib::closure!(@watch instance => move |_signaler: glib::Object, session_id: &str|{
glib::closure!(#[watch] instance, move |_signaler: glib::Object, session_id: &str|{
let this = instance.imp();
let state = this.state.lock().unwrap();
let Some(session) = state.sessions.get(session_id) else {
@ -900,7 +933,7 @@ impl BaseWebRTCSrc {
request_meta: signaller.connect_closure(
"request-meta",
false,
glib::closure!(@watch instance => move |
glib::closure!(#[watch] instance, move |
_signaller: glib::Object| -> Option<gst::Structure> {
instance.imp().settings.lock().unwrap().meta.clone()
}),
@ -909,7 +942,7 @@ impl BaseWebRTCSrc {
session_description: signaller.connect_closure(
"session-description",
false,
glib::closure!(@watch instance => move |
glib::closure!(#[watch] instance, move |
_signaller: glib::Object,
session_id: &str,
desc: &gst_webrtc::WebRTCSessionDescription| {
@ -934,7 +967,7 @@ impl BaseWebRTCSrc {
handle_ice: signaller.connect_closure(
"handle-ice",
false,
glib::closure!(@watch instance => move |
glib::closure!(#[watch] instance, move |
_signaller: glib::Object,
session_id: &str,
sdp_m_line_index: u32,
@ -1079,90 +1112,110 @@ impl BaseWebRTCSrc {
let bin = gst::Bin::new();
bin.connect_pad_removed(
glib::clone!(@weak self as this, @to-owned session_id => move |_, pad|
bin.connect_pad_removed(glib::clone!(
#[weak(rename_to = this)]
self,
#[to_owned]
session_id,
move |_, pad| {
let mut state = this.state.lock().unwrap();
let Some(session) = state.sessions.get_mut(&session_id) else {
gst::warning!(CAT, imp: this, "session {session_id:?} not found");
return
return;
};
session.flow_combiner.lock().unwrap().remove_pad(pad);
),
);
bin.connect_pad_added(
glib::clone!(@weak self as this, @to-owned session_id => move |_, pad|
}
));
bin.connect_pad_added(glib::clone!(
#[weak(rename_to = this)]
self,
#[to_owned]
session_id,
move |_, pad| {
let mut state = this.state.lock().unwrap();
let Some(session) = state.sessions.get_mut(&session_id) else {
gst::warning!(CAT, imp: this, "session {session_id:?} not found");
return
return;
};
session.flow_combiner.lock().unwrap().add_pad(pad);
}
));
),
);
webrtcbin.connect_pad_added(
glib::clone!(@weak self as this, @weak bin, @to-owned session_id => move |_webrtcbin, pad| {
webrtcbin.connect_pad_added(glib::clone!(
#[weak(rename_to = this)]
self,
#[weak]
bin,
#[to_owned]
session_id,
move |_webrtcbin, pad| {
if pad.direction() == gst::PadDirection::Sink {
return;
}
let mut state = this.state.lock().unwrap();
let Some(session) = state.sessions.get_mut(&session_id) else {
gst::error!(CAT, imp: this, "session {session_id:?} not found");
return
return;
};
let bin_ghostpad = session.handle_webrtc_src_pad(&bin, pad, &this.obj());
drop(state);
bin.add_pad(&bin_ghostpad)
.expect("Adding ghostpad to the bin should always work");
}),
);
}
));
webrtcbin.connect_pad_removed(
glib::clone!(@weak self as this, @weak bin, @to-owned session_id => move |_webrtcbin, pad| {
let mut state = this.state.lock().unwrap();
let Some(session) = state.sessions.get_mut(&session_id) else {
gst::error!(CAT, imp: this, "session {session_id:?} not found");
return
};
session.flow_combiner.lock().unwrap().remove_pad(pad);
}),
);
webrtcbin.connect_pad_removed(glib::clone!(
#[weak(rename_to = this)]
self,
#[to_owned]
session_id,
move |_webrtcbin, pad| {
let mut state = this.state.lock().unwrap();
let Some(session) = state.sessions.get_mut(&session_id) else {
gst::error!(CAT, imp: this, "session {session_id:?} not found");
return;
};
session.flow_combiner.lock().unwrap().remove_pad(pad);
}
));
webrtcbin.connect_closure(
"on-ice-candidate",
false,
glib::closure!(@weak-allow-none self as this, @to-owned session_id => move |
_webrtcbin: gst::Bin,
sdp_m_line_index: u32,
candidate: String| {
if let Some(ele) = this {
let mut state = ele.state.lock().unwrap();
glib::closure!(
#[weak(rename_to = this)]
self,
#[to_owned]
session_id,
move |_webrtcbin: gst::Bin, sdp_m_line_index: u32, candidate: String| {
let mut state = this.state.lock().unwrap();
let Some(session) = state.sessions.get_mut(&session_id) else {
gst::error!(CAT, imp: ele, "session {session_id:?} not found");
return
gst::error!(CAT, imp: this, "session {session_id:?} not found");
return;
};
session.on_ice_candidate(sdp_m_line_index, candidate, &ele.obj());
session.on_ice_candidate(sdp_m_line_index, candidate, &this.obj());
}
}),
),
);
webrtcbin.connect_closure(
"on-data-channel",
false,
glib::closure!(@weak-allow-none self as this, @to-owned session_id => move |
_webrtcbin: gst::Bin,
data_channel: glib::Object| {
if let Some(ele) = this {
let mut state = ele.state.lock().unwrap();
glib::closure!(
#[weak(rename_to = this)]
self,
#[to_owned]
session_id,
move |_webrtcbin: gst::Bin, data_channel: glib::Object| {
let mut state = this.state.lock().unwrap();
let Some(session) = state.sessions.get_mut(&session_id) else {
gst::error!(CAT, imp: ele, "session {session_id:?} not found");
return
gst::error!(CAT, imp: this, "session {session_id:?} not found");
return;
};
session.on_data_channel(data_channel, &ele.obj());
session.on_data_channel(data_channel, &this.obj());
}
}),
),
);
bin.add(&webrtcbin).unwrap();

View file

@ -453,32 +453,34 @@ impl SignallableImpl for WhipClient {
glib::closure!(|signaller: &super::WhipClientSignaller,
_consumer_identifier: &str,
webrtcbin: &gst::Element| {
let obj_weak = signaller.downgrade();
webrtcbin.connect_notify(Some("ice-gathering-state"), move |webrtcbin, _pspec| {
let Some(obj) = obj_weak.upgrade() else {
return;
};
webrtcbin.connect_notify(
Some("ice-gathering-state"),
glib::clone!(
#[weak]
signaller,
move |webrtcbin, _pspec| {
let state = webrtcbin
.property::<WebRTCICEGatheringState>("ice-gathering-state");
let state =
webrtcbin.property::<WebRTCICEGatheringState>("ice-gathering-state");
match state {
WebRTCICEGatheringState::Gathering => {
gst::info!(CAT, obj: signaller, "ICE gathering started");
}
WebRTCICEGatheringState::Complete => {
gst::info!(CAT, obj: signaller, "ICE gathering complete");
match state {
WebRTCICEGatheringState::Gathering => {
gst::info!(CAT, obj: obj, "ICE gathering started");
}
WebRTCICEGatheringState::Complete => {
gst::info!(CAT, obj: obj, "ICE gathering complete");
let webrtcbin = webrtcbin.clone();
let webrtcbin = webrtcbin.clone();
RUNTIME.spawn(async move {
RUNTIME.spawn(async move {
/* Note that we check for a valid WHIP endpoint in change_state */
obj.imp().send_offer(&webrtcbin).await
signaller.imp().send_offer(&webrtcbin).await
});
}
_ => (),
}
}
_ => (),
}
});
),
);
}),
);
@ -657,50 +659,48 @@ impl WhipServer {
glib::closure!(|signaller: &super::WhipServerSignaller,
_producer_identifier: &str,
webrtcbin: &gst::Element| {
let obj_weak = signaller.downgrade();
webrtcbin.connect_notify(Some("ice-gathering-state"), move |webrtcbin, _pspec| {
let obj = match obj_weak.upgrade() {
Some(obj) => obj,
None => return,
};
webrtcbin.connect_notify(
Some("ice-gathering-state"),
glib::clone!(
#[weak]
signaller,
move |webrtcbin, _pspec| {
let state =
webrtcbin.property::<WebRTCICEGatheringState>("ice-gathering-state");
let state = webrtcbin.property::<WebRTCICEGatheringState>("ice-gathering-state");
match state {
WebRTCICEGatheringState::Gathering => {
gst::info!(CAT, obj: obj, "ICE gathering started");
}
WebRTCICEGatheringState::Complete => {
gst::info!(CAT, obj: obj, "ICE gathering complete");
let ans: Option<gst_sdp::SDPMessage>;
let mut settings = obj.imp().settings.lock().unwrap();
if let Some(answer_desc) = webrtcbin
.property::<Option<WebRTCSessionDescription>>("local-description")
{
ans = Some(answer_desc.sdp().to_owned());
} else {
ans = None;
}
let tx = settings
.sdp_answer
.take()
.expect("SDP answer Sender needs to be valid");
let obj_weak = obj.downgrade();
RUNTIME.spawn(async move {
let obj = match obj_weak.upgrade() {
Some(obj) => obj,
None => return,
};
if let Err(e) = tx.send(ans).await {
gst::error!(CAT, obj: obj, "Failed to send SDP {e}");
match state {
WebRTCICEGatheringState::Gathering => {
gst::info!(CAT, obj: signaller, "ICE gathering started");
}
});
WebRTCICEGatheringState::Complete => {
gst::info!(CAT, obj: signaller, "ICE gathering complete");
let ans: Option<gst_sdp::SDPMessage>;
let mut settings = signaller.imp().settings.lock().unwrap();
if let Some(answer_desc) = webrtcbin
.property::<Option<WebRTCSessionDescription>>(
"local-description",
)
{
ans = Some(answer_desc.sdp().to_owned());
} else {
ans = None;
}
let tx = settings
.sdp_answer
.take()
.expect("SDP answer Sender needs to be valid");
RUNTIME.spawn(glib::clone!(#[strong] signaller, async move {
if let Err(e) = tx.send(ans).await {
gst::error!(CAT, obj: signaller, "Failed to send SDP {e}");
}
}));
}
_ => (),
}
}
_ => (),
}
});
),
);
})
}
@ -940,37 +940,29 @@ impl WhipServer {
let prefix = warp::path(ROOT);
let self_weak = self.downgrade();
// POST /endpoint
let post_filter = warp::post()
.and(warp::path(ENDPOINT_PATH))
.and(warp::path::end())
.and(warp::header::exact(CONTENT_TYPE.as_str(), CONTENT_SDP))
.and(warp::body::bytes())
.and_then(move |body| {
let s = self_weak.upgrade();
async {
let self_ = s.expect("Need to have the ObjectRef");
self_.post_handler(body).await
}
});
let self_weak = self.downgrade();
.and_then(glib::clone!(
#[weak(rename_to = self_)]
self,
#[upgrade_or_panic]
move |body| async move { self_.post_handler(body).await }
));
// OPTIONS /endpoint
let options_filter = warp::options()
.and(warp::path(ENDPOINT_PATH))
.and(warp::path::end())
.and_then(move || {
let s = self_weak.upgrade();
async {
let self_ = s.expect("Need to have the ObjectRef");
self_.options_handler().await
}
});
let self_weak = self.downgrade();
.and_then(glib::clone!(
#[weak(rename_to = self_)]
self,
#[upgrade_or_panic]
move || async move { self_.options_handler().await }
));
// PATCH /resource/:id
let patch_filter = warp::patch()
@ -981,28 +973,24 @@ impl WhipServer {
CONTENT_TYPE.as_str(),
CONTENT_TRICKLE_ICE,
))
.and_then(move |id| {
let s = self_weak.upgrade();
async {
let self_ = s.expect("Need to have the ObjectRef");
self_.patch_handler(id).await
}
});
let self_weak = self.downgrade();
.and_then(glib::clone!(
#[weak(rename_to = self_)]
self,
#[upgrade_or_panic]
move |id| async move { self_.patch_handler(id).await }
));
// DELETE /resource/:id
let delete_filter = warp::delete()
.and(warp::path(RESOURCE_PATH))
.and(warp::path::param::<String>())
.and(warp::path::end())
.and_then(move |id| {
let s = self_weak.upgrade();
async {
let self_ = s.expect("Need to have the ObjectRef");
self_.delete_handler(id).await
}
});
.and_then(glib::clone!(
#[weak(rename_to = self_)]
self,
#[upgrade_or_panic]
move |id| async move { self_.delete_handler(id).await }
));
let api = prefix
.and(post_filter)