diff --git a/Cargo.lock b/Cargo.lock index 6b0d41c3..2c9c641d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -981,7 +981,7 @@ dependencies = [ [[package]] name = "cairo-rs" version = "0.20.0" -source = "git+https://github.com/gtk-rs/gtk-rs-core?branch=master#d88a7c473e2af36a4718a648406b7894d288ec1b" +source = "git+https://github.com/gtk-rs/gtk-rs-core?branch=master#dbfaebdf8c156f2af843479060e5c6fa08e0ad50" dependencies = [ "bitflags 2.5.0", "cairo-sys-rs", @@ -993,11 +993,11 @@ dependencies = [ [[package]] name = "cairo-sys-rs" version = "0.20.0" -source = "git+https://github.com/gtk-rs/gtk-rs-core?branch=master#d88a7c473e2af36a4718a648406b7894d288ec1b" +source = "git+https://github.com/gtk-rs/gtk-rs-core?branch=master#dbfaebdf8c156f2af843479060e5c6fa08e0ad50" dependencies = [ "glib-sys", "libc", - "system-deps", + "system-deps 7.0.1", ] [[package]] @@ -1497,7 +1497,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6ecb1c5e8f4dc438eedc1b534a54672fb0e0a56035dae6b50162787bd2c50e95" dependencies = [ "libc", - "system-deps", + "system-deps 6.2.2", ] [[package]] @@ -1933,7 +1933,7 @@ dependencies = [ [[package]] name = "gdk-pixbuf" version = "0.20.0" -source = "git+https://github.com/gtk-rs/gtk-rs-core?branch=master#d88a7c473e2af36a4718a648406b7894d288ec1b" +source = "git+https://github.com/gtk-rs/gtk-rs-core?branch=master#dbfaebdf8c156f2af843479060e5c6fa08e0ad50" dependencies = [ "gdk-pixbuf-sys", "gio", @@ -1944,13 +1944,13 @@ dependencies = [ [[package]] name = "gdk-pixbuf-sys" version = "0.20.0" -source = "git+https://github.com/gtk-rs/gtk-rs-core?branch=master#d88a7c473e2af36a4718a648406b7894d288ec1b" +source = "git+https://github.com/gtk-rs/gtk-rs-core?branch=master#dbfaebdf8c156f2af843479060e5c6fa08e0ad50" dependencies = [ "gio-sys", "glib-sys", "gobject-sys", "libc", - "system-deps", + "system-deps 7.0.1", ] [[package]] @@ -1980,7 +1980,7 @@ dependencies = [ "libc", "pango-sys", "pkg-config", - "system-deps", + "system-deps 6.2.2", ] [[package]] @@ -2002,7 +2002,7 @@ source = "git+https://github.com/gtk-rs/gtk4-rs?branch=master#f7ff7a0c8f4b3b892e dependencies = [ "glib-sys", "libc", - "system-deps", + "system-deps 6.2.2", ] [[package]] @@ -2026,7 +2026,7 @@ dependencies = [ "gdk4-sys", "glib-sys", "libc", - "system-deps", + "system-deps 6.2.2", ] [[package]] @@ -2049,7 +2049,7 @@ dependencies = [ "gdk4-sys", "glib-sys", "libc", - "system-deps", + "system-deps 6.2.2", ] [[package]] @@ -2103,7 +2103,7 @@ checksum = "40ecd4077b5ae9fd2e9e169b102c6c330d0605168eb0e8bf79952b256dbefffd" [[package]] name = "gio" version = "0.20.0" -source = "git+https://github.com/gtk-rs/gtk-rs-core?branch=master#d88a7c473e2af36a4718a648406b7894d288ec1b" +source = "git+https://github.com/gtk-rs/gtk-rs-core?branch=master#dbfaebdf8c156f2af843479060e5c6fa08e0ad50" dependencies = [ "futures-channel", "futures-core", @@ -2120,19 +2120,19 @@ dependencies = [ [[package]] name = "gio-sys" version = "0.20.0" -source = "git+https://github.com/gtk-rs/gtk-rs-core?branch=master#d88a7c473e2af36a4718a648406b7894d288ec1b" +source = "git+https://github.com/gtk-rs/gtk-rs-core?branch=master#dbfaebdf8c156f2af843479060e5c6fa08e0ad50" dependencies = [ "glib-sys", "gobject-sys", "libc", - "system-deps", + "system-deps 7.0.1", "windows-sys 0.52.0", ] [[package]] name = "glib" version = "0.20.0" -source = "git+https://github.com/gtk-rs/gtk-rs-core?branch=master#d88a7c473e2af36a4718a648406b7894d288ec1b" +source = "git+https://github.com/gtk-rs/gtk-rs-core?branch=master#dbfaebdf8c156f2af843479060e5c6fa08e0ad50" dependencies = [ "bitflags 2.5.0", "futures-channel", @@ -2153,7 +2153,7 @@ dependencies = [ [[package]] name = "glib-macros" version = "0.20.0" -source = "git+https://github.com/gtk-rs/gtk-rs-core?branch=master#d88a7c473e2af36a4718a648406b7894d288ec1b" +source = "git+https://github.com/gtk-rs/gtk-rs-core?branch=master#dbfaebdf8c156f2af843479060e5c6fa08e0ad50" dependencies = [ "heck 0.5.0", "proc-macro-crate", @@ -2165,10 +2165,10 @@ dependencies = [ [[package]] name = "glib-sys" version = "0.20.0" -source = "git+https://github.com/gtk-rs/gtk-rs-core?branch=master#d88a7c473e2af36a4718a648406b7894d288ec1b" +source = "git+https://github.com/gtk-rs/gtk-rs-core?branch=master#dbfaebdf8c156f2af843479060e5c6fa08e0ad50" dependencies = [ "libc", - "system-deps", + "system-deps 7.0.1", ] [[package]] @@ -2180,17 +2180,17 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "gobject-sys" version = "0.20.0" -source = "git+https://github.com/gtk-rs/gtk-rs-core?branch=master#d88a7c473e2af36a4718a648406b7894d288ec1b" +source = "git+https://github.com/gtk-rs/gtk-rs-core?branch=master#dbfaebdf8c156f2af843479060e5c6fa08e0ad50" dependencies = [ "glib-sys", "libc", - "system-deps", + "system-deps 7.0.1", ] [[package]] name = "graphene-rs" version = "0.20.0" -source = "git+https://github.com/gtk-rs/gtk-rs-core?branch=master#d88a7c473e2af36a4718a648406b7894d288ec1b" +source = "git+https://github.com/gtk-rs/gtk-rs-core?branch=master#dbfaebdf8c156f2af843479060e5c6fa08e0ad50" dependencies = [ "glib", "graphene-sys", @@ -2200,12 +2200,12 @@ dependencies = [ [[package]] name = "graphene-sys" version = "0.20.0" -source = "git+https://github.com/gtk-rs/gtk-rs-core?branch=master#d88a7c473e2af36a4718a648406b7894d288ec1b" +source = "git+https://github.com/gtk-rs/gtk-rs-core?branch=master#dbfaebdf8c156f2af843479060e5c6fa08e0ad50" dependencies = [ "glib-sys", "libc", "pkg-config", - "system-deps", + "system-deps 7.0.1", ] [[package]] @@ -2245,7 +2245,7 @@ dependencies = [ "graphene-sys", "libc", "pango-sys", - "system-deps", + "system-deps 6.2.2", ] [[package]] @@ -3122,7 +3122,7 @@ dependencies = [ [[package]] name = "gstreamer" version = "0.23.0" -source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#0be8b364f8ea871ca5bd1f4d2ad6fa5cee5e437a" +source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#cfb0fe6a17254eef20abe2584f975ab9f4dc26a4" dependencies = [ "cfg-if", "futures-channel", @@ -3148,7 +3148,7 @@ dependencies = [ [[package]] name = "gstreamer-allocators" version = "0.23.0" -source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#0be8b364f8ea871ca5bd1f4d2ad6fa5cee5e437a" +source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#cfb0fe6a17254eef20abe2584f975ab9f4dc26a4" dependencies = [ "glib", "gstreamer", @@ -3160,19 +3160,19 @@ dependencies = [ [[package]] name = "gstreamer-allocators-sys" version = "0.23.0" -source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#0be8b364f8ea871ca5bd1f4d2ad6fa5cee5e437a" +source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#cfb0fe6a17254eef20abe2584f975ab9f4dc26a4" dependencies = [ "glib-sys", "gobject-sys", "gstreamer-sys", "libc", - "system-deps", + "system-deps 7.0.1", ] [[package]] name = "gstreamer-app" version = "0.23.0" -source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#0be8b364f8ea871ca5bd1f4d2ad6fa5cee5e437a" +source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#cfb0fe6a17254eef20abe2584f975ab9f4dc26a4" dependencies = [ "futures-core", "futures-sink", @@ -3186,19 +3186,19 @@ dependencies = [ [[package]] name = "gstreamer-app-sys" version = "0.23.0" -source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#0be8b364f8ea871ca5bd1f4d2ad6fa5cee5e437a" +source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#cfb0fe6a17254eef20abe2584f975ab9f4dc26a4" dependencies = [ "glib-sys", "gstreamer-base-sys", "gstreamer-sys", "libc", - "system-deps", + "system-deps 7.0.1", ] [[package]] name = "gstreamer-audio" version = "0.23.0" -source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#0be8b364f8ea871ca5bd1f4d2ad6fa5cee5e437a" +source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#cfb0fe6a17254eef20abe2584f975ab9f4dc26a4" dependencies = [ "cfg-if", "glib", @@ -3214,20 +3214,20 @@ dependencies = [ [[package]] name = "gstreamer-audio-sys" version = "0.23.0" -source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#0be8b364f8ea871ca5bd1f4d2ad6fa5cee5e437a" +source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#cfb0fe6a17254eef20abe2584f975ab9f4dc26a4" dependencies = [ "glib-sys", "gobject-sys", "gstreamer-base-sys", "gstreamer-sys", "libc", - "system-deps", + "system-deps 7.0.1", ] [[package]] name = "gstreamer-base" version = "0.23.0" -source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#0be8b364f8ea871ca5bd1f4d2ad6fa5cee5e437a" +source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#cfb0fe6a17254eef20abe2584f975ab9f4dc26a4" dependencies = [ "atomic_refcell", "cfg-if", @@ -3240,19 +3240,19 @@ dependencies = [ [[package]] name = "gstreamer-base-sys" version = "0.23.0" -source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#0be8b364f8ea871ca5bd1f4d2ad6fa5cee5e437a" +source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#cfb0fe6a17254eef20abe2584f975ab9f4dc26a4" dependencies = [ "glib-sys", "gobject-sys", "gstreamer-sys", "libc", - "system-deps", + "system-deps 7.0.1", ] [[package]] name = "gstreamer-check" version = "0.23.0" -source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#0be8b364f8ea871ca5bd1f4d2ad6fa5cee5e437a" +source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#cfb0fe6a17254eef20abe2584f975ab9f4dc26a4" dependencies = [ "glib", "gstreamer", @@ -3262,19 +3262,19 @@ dependencies = [ [[package]] name = "gstreamer-check-sys" version = "0.23.0" -source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#0be8b364f8ea871ca5bd1f4d2ad6fa5cee5e437a" +source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#cfb0fe6a17254eef20abe2584f975ab9f4dc26a4" dependencies = [ "glib-sys", "gobject-sys", "gstreamer-sys", "libc", - "system-deps", + "system-deps 7.0.1", ] [[package]] name = "gstreamer-gl" version = "0.23.0" -source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#0be8b364f8ea871ca5bd1f4d2ad6fa5cee5e437a" +source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#cfb0fe6a17254eef20abe2584f975ab9f4dc26a4" dependencies = [ "glib", "gstreamer", @@ -3288,7 +3288,7 @@ dependencies = [ [[package]] name = "gstreamer-gl-egl" version = "0.23.0" -source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#0be8b364f8ea871ca5bd1f4d2ad6fa5cee5e437a" +source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#cfb0fe6a17254eef20abe2584f975ab9f4dc26a4" dependencies = [ "glib", "gstreamer", @@ -3300,18 +3300,18 @@ dependencies = [ [[package]] name = "gstreamer-gl-egl-sys" version = "0.23.0" -source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#0be8b364f8ea871ca5bd1f4d2ad6fa5cee5e437a" +source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#cfb0fe6a17254eef20abe2584f975ab9f4dc26a4" dependencies = [ "glib-sys", "gstreamer-gl-sys", "libc", - "system-deps", + "system-deps 7.0.1", ] [[package]] name = "gstreamer-gl-sys" version = "0.23.0" -source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#0be8b364f8ea871ca5bd1f4d2ad6fa5cee5e437a" +source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#cfb0fe6a17254eef20abe2584f975ab9f4dc26a4" dependencies = [ "glib-sys", "gobject-sys", @@ -3319,13 +3319,13 @@ dependencies = [ "gstreamer-sys", "gstreamer-video-sys", "libc", - "system-deps", + "system-deps 7.0.1", ] [[package]] name = "gstreamer-gl-wayland" version = "0.23.0" -source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#0be8b364f8ea871ca5bd1f4d2ad6fa5cee5e437a" +source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#cfb0fe6a17254eef20abe2584f975ab9f4dc26a4" dependencies = [ "glib", "gstreamer", @@ -3337,18 +3337,18 @@ dependencies = [ [[package]] name = "gstreamer-gl-wayland-sys" version = "0.23.0" -source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#0be8b364f8ea871ca5bd1f4d2ad6fa5cee5e437a" +source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#cfb0fe6a17254eef20abe2584f975ab9f4dc26a4" dependencies = [ "glib-sys", "gstreamer-gl-sys", "libc", - "system-deps", + "system-deps 7.0.1", ] [[package]] name = "gstreamer-gl-x11" version = "0.23.0" -source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#0be8b364f8ea871ca5bd1f4d2ad6fa5cee5e437a" +source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#cfb0fe6a17254eef20abe2584f975ab9f4dc26a4" dependencies = [ "glib", "gstreamer", @@ -3360,18 +3360,18 @@ dependencies = [ [[package]] name = "gstreamer-gl-x11-sys" version = "0.23.0" -source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#0be8b364f8ea871ca5bd1f4d2ad6fa5cee5e437a" +source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#cfb0fe6a17254eef20abe2584f975ab9f4dc26a4" dependencies = [ "glib-sys", "gstreamer-gl-sys", "libc", - "system-deps", + "system-deps 7.0.1", ] [[package]] name = "gstreamer-net" version = "0.23.0" -source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#0be8b364f8ea871ca5bd1f4d2ad6fa5cee5e437a" +source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#cfb0fe6a17254eef20abe2584f975ab9f4dc26a4" dependencies = [ "gio", "glib", @@ -3382,19 +3382,19 @@ dependencies = [ [[package]] name = "gstreamer-net-sys" version = "0.23.0" -source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#0be8b364f8ea871ca5bd1f4d2ad6fa5cee5e437a" +source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#cfb0fe6a17254eef20abe2584f975ab9f4dc26a4" dependencies = [ "gio-sys", "glib-sys", "gstreamer-sys", "libc", - "system-deps", + "system-deps 7.0.1", ] [[package]] name = "gstreamer-pbutils" version = "0.23.0" -source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#0be8b364f8ea871ca5bd1f4d2ad6fa5cee5e437a" +source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#cfb0fe6a17254eef20abe2584f975ab9f4dc26a4" dependencies = [ "glib", "gstreamer", @@ -3408,7 +3408,7 @@ dependencies = [ [[package]] name = "gstreamer-pbutils-sys" version = "0.23.0" -source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#0be8b364f8ea871ca5bd1f4d2ad6fa5cee5e437a" +source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#cfb0fe6a17254eef20abe2584f975ab9f4dc26a4" dependencies = [ "glib-sys", "gobject-sys", @@ -3416,13 +3416,13 @@ dependencies = [ "gstreamer-sys", "gstreamer-video-sys", "libc", - "system-deps", + "system-deps 7.0.1", ] [[package]] name = "gstreamer-rtp" version = "0.23.0" -source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#0be8b364f8ea871ca5bd1f4d2ad6fa5cee5e437a" +source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#cfb0fe6a17254eef20abe2584f975ab9f4dc26a4" dependencies = [ "glib", "gstreamer", @@ -3433,19 +3433,19 @@ dependencies = [ [[package]] name = "gstreamer-rtp-sys" version = "0.23.0" -source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#0be8b364f8ea871ca5bd1f4d2ad6fa5cee5e437a" +source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#cfb0fe6a17254eef20abe2584f975ab9f4dc26a4" dependencies = [ "glib-sys", "gstreamer-base-sys", "gstreamer-sys", "libc", - "system-deps", + "system-deps 7.0.1", ] [[package]] name = "gstreamer-sdp" version = "0.23.0" -source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#0be8b364f8ea871ca5bd1f4d2ad6fa5cee5e437a" +source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#cfb0fe6a17254eef20abe2584f975ab9f4dc26a4" dependencies = [ "glib", "gstreamer", @@ -3455,29 +3455,29 @@ dependencies = [ [[package]] name = "gstreamer-sdp-sys" version = "0.23.0" -source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#0be8b364f8ea871ca5bd1f4d2ad6fa5cee5e437a" +source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#cfb0fe6a17254eef20abe2584f975ab9f4dc26a4" dependencies = [ "glib-sys", "gstreamer-sys", "libc", - "system-deps", + "system-deps 7.0.1", ] [[package]] name = "gstreamer-sys" version = "0.23.0" -source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#0be8b364f8ea871ca5bd1f4d2ad6fa5cee5e437a" +source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#cfb0fe6a17254eef20abe2584f975ab9f4dc26a4" dependencies = [ "glib-sys", "gobject-sys", "libc", - "system-deps", + "system-deps 7.0.1", ] [[package]] name = "gstreamer-utils" version = "0.23.0" -source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#0be8b364f8ea871ca5bd1f4d2ad6fa5cee5e437a" +source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#cfb0fe6a17254eef20abe2584f975ab9f4dc26a4" dependencies = [ "gstreamer", "gstreamer-app", @@ -3489,7 +3489,7 @@ dependencies = [ [[package]] name = "gstreamer-video" version = "0.23.0" -source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#0be8b364f8ea871ca5bd1f4d2ad6fa5cee5e437a" +source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#cfb0fe6a17254eef20abe2584f975ab9f4dc26a4" dependencies = [ "cfg-if", "futures-channel", @@ -3506,20 +3506,20 @@ dependencies = [ [[package]] name = "gstreamer-video-sys" version = "0.23.0" -source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#0be8b364f8ea871ca5bd1f4d2ad6fa5cee5e437a" +source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#cfb0fe6a17254eef20abe2584f975ab9f4dc26a4" dependencies = [ "glib-sys", "gobject-sys", "gstreamer-base-sys", "gstreamer-sys", "libc", - "system-deps", + "system-deps 7.0.1", ] [[package]] name = "gstreamer-webrtc" version = "0.23.0" -source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#0be8b364f8ea871ca5bd1f4d2ad6fa5cee5e437a" +source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#cfb0fe6a17254eef20abe2584f975ab9f4dc26a4" dependencies = [ "glib", "gstreamer", @@ -3531,13 +3531,13 @@ dependencies = [ [[package]] name = "gstreamer-webrtc-sys" version = "0.23.0" -source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#0be8b364f8ea871ca5bd1f4d2ad6fa5cee5e437a" +source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs?branch=main#cfb0fe6a17254eef20abe2584f975ab9f4dc26a4" dependencies = [ "glib-sys", "gstreamer-sdp-sys", "gstreamer-sys", "libc", - "system-deps", + "system-deps 7.0.1", ] [[package]] @@ -3586,7 +3586,7 @@ dependencies = [ "gsk4-sys", "libc", "pango-sys", - "system-deps", + "system-deps 6.2.2", ] [[package]] @@ -4397,7 +4397,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c2a198fb6b0eada2a8df47933734e6d35d350665a33a3593d7164fa52c75c19" dependencies = [ "cfg-if", - "windows-targets 0.52.5", + "windows-targets 0.48.5", ] [[package]] @@ -5149,7 +5149,7 @@ dependencies = [ [[package]] name = "pango" version = "0.20.0" -source = "git+https://github.com/gtk-rs/gtk-rs-core?branch=master#d88a7c473e2af36a4718a648406b7894d288ec1b" +source = "git+https://github.com/gtk-rs/gtk-rs-core?branch=master#dbfaebdf8c156f2af843479060e5c6fa08e0ad50" dependencies = [ "gio", "glib", @@ -5160,18 +5160,18 @@ dependencies = [ [[package]] name = "pango-sys" version = "0.20.0" -source = "git+https://github.com/gtk-rs/gtk-rs-core?branch=master#d88a7c473e2af36a4718a648406b7894d288ec1b" +source = "git+https://github.com/gtk-rs/gtk-rs-core?branch=master#dbfaebdf8c156f2af843479060e5c6fa08e0ad50" dependencies = [ "glib-sys", "gobject-sys", "libc", - "system-deps", + "system-deps 7.0.1", ] [[package]] name = "pangocairo" version = "0.20.0" -source = "git+https://github.com/gtk-rs/gtk-rs-core?branch=master#d88a7c473e2af36a4718a648406b7894d288ec1b" +source = "git+https://github.com/gtk-rs/gtk-rs-core?branch=master#dbfaebdf8c156f2af843479060e5c6fa08e0ad50" dependencies = [ "cairo-rs", "glib", @@ -5183,13 +5183,13 @@ dependencies = [ [[package]] name = "pangocairo-sys" version = "0.20.0" -source = "git+https://github.com/gtk-rs/gtk-rs-core?branch=master#d88a7c473e2af36a4718a648406b7894d288ec1b" +source = "git+https://github.com/gtk-rs/gtk-rs-core?branch=master#dbfaebdf8c156f2af843479060e5c6fa08e0ad50" dependencies = [ "cairo-sys-rs", "glib-sys", "libc", "pango-sys", - "system-deps", + "system-deps 7.0.1", ] [[package]] @@ -5526,7 +5526,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "22505a5c94da8e3b7c2996394d1c933236c4d743e81a410bcca4e6989fc066a4" dependencies = [ "bytes", - "heck 0.5.0", + "heck 0.4.1", "itertools 0.12.1", "log", "multimap 0.10.0", @@ -5747,7 +5747,7 @@ dependencies = [ "rand", "rand_chacha", "simd_helpers", - "system-deps", + "system-deps 6.2.2", "thiserror", "v_frame", ] @@ -6741,6 +6741,19 @@ dependencies = [ "version-compare", ] +[[package]] +name = "system-deps" +version = "7.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c81f13d9a334a6c242465140bd262fae382b752ff2011c4f7419919a9c97922" +dependencies = [ + "cfg-expr", + "heck 0.5.0", + "pkg-config", + "toml", + "version-compare", +] + [[package]] name = "target-lexicon" version = "0.12.14" diff --git a/generic/threadshare/examples/benchmark.rs b/generic/threadshare/examples/benchmark.rs index 7f75c6e9..4b380edb 100644 --- a/generic/threadshare/examples/benchmark.rs +++ b/generic/threadshare/examples/benchmark.rs @@ -84,13 +84,16 @@ fn main() { .property("signal-handoffs", true) .build() .unwrap(); - sink.connect( + sink.connect_closure( "handoff", true, - glib::clone!(@strong counter => move |_| { - let _ = counter.fetch_add(1, Ordering::SeqCst); - None - }), + glib::closure!( + #[strong] + counter, + move |_fakesink: &gst::Element, _buffer: &gst::Buffer, _pad: &gst::Pad| { + let _ = counter.fetch_add(1, Ordering::SeqCst); + } + ), ); let (source, context) = match source.as_str() { diff --git a/net/webrtc/src/janusvr_signaller/imp.rs b/net/webrtc/src/janusvr_signaller/imp.rs index e18713e7..78277539 100644 --- a/net/webrtc/src/janusvr_signaller/imp.rs +++ b/net/webrtc/src/janusvr_signaller/imp.rs @@ -334,8 +334,10 @@ impl Signaller { // 1000 is completely arbitrary, we simply don't want infinite piling // up of messages as with unbounded let (ws_sender, mut ws_receiver) = mpsc::channel::(1000); - let send_task_handle = - RUNTIME.spawn(glib::clone!(@weak-allow-none self as this => async move { + let send_task_handle = RUNTIME.spawn(glib::clone!( + #[to_owned(rename_to = this)] + self, + async move { let mut res = Ok(()); loop { tokio::select! { @@ -349,7 +351,6 @@ impl Signaller { None => break, }, _ = tokio::time::sleep(Duration::from_secs(10)) => { - if let Some(ref this) = this { let (transaction, session_id, apisecret) = { let state = this.state.lock().unwrap(); let settings = this.settings.lock().unwrap(); @@ -368,45 +369,37 @@ impl Signaller { res = ws_sink .send(WsMessage::Text(serde_json::to_string(&msg).unwrap())) .await; - } } } if let Err(ref err) = res { - this.as_ref().map_or_else(|| gst::error!(CAT, "Quitting send task: {err}"), - |this| gst::error!(CAT, imp: this, "Quitting send task: {err}") - ); - + gst::error!(CAT, imp: this, "Quitting send task: {err}"); break; } } - this.map_or_else(|| gst::debug!(CAT, "Done sending"), - |this| gst::debug!(CAT, imp: this, "Done sending") - ); + gst::debug!(CAT, imp: this, "Done sending"); let _ = ws_sink.close().await; res.map_err(Into::into) - })); + } + )); - let recv_task_handle = - RUNTIME.spawn(glib::clone!(@weak-allow-none self as this => async move { + let recv_task_handle = RUNTIME.spawn(glib::clone!( + #[to_owned(rename_to = this)] + self, + async move { while let Some(msg) = tokio_stream::StreamExt::next(&mut ws_stream).await { - if let Some(ref this) = this { - if let ControlFlow::Break(_) = this.handle_msg(msg) { - break; - } - } else { + if let ControlFlow::Break(_) = this.handle_msg(msg) { break; } } let msg = "Stopped websocket receiving"; - this.map_or_else(|| gst::info!(CAT, "{msg}"), - |this| gst::info!(CAT, imp: this, "{msg}") - ); - })); + gst::info!(CAT, imp: this, "{msg}"); + } + )); let mut state = self.state.lock().unwrap(); state.ws_sender = Some(ws_sender); @@ -531,11 +524,15 @@ impl Signaller { fn send(&self, msg: OutgoingMessage) { let state = self.state.lock().unwrap(); if let Some(mut sender) = state.ws_sender.clone() { - RUNTIME.spawn(glib::clone!(@weak self as this => async move { - if let Err(err) = sender.send(msg).await { - this.raise_error(err.to_string()); + RUNTIME.spawn(glib::clone!( + #[to_owned(rename_to = this)] + self, + async move { + if let Err(err) = sender.send(msg).await { + this.raise_error(err.to_string()); + } } - })); + )); } } @@ -545,11 +542,11 @@ impl Signaller { fn send_blocking(&self, msg: OutgoingMessage) { let state = self.state.lock().unwrap(); if let Some(mut sender) = state.ws_sender.clone() { - RUNTIME.block_on(glib::clone!(@weak self as this => async move { + RUNTIME.block_on(async { if let Err(err) = sender.send(msg).await { - this.raise_error(err.to_string()); + self.raise_error(err.to_string()); } - })); + }); } } diff --git a/net/webrtc/src/livekit_signaller/imp.rs b/net/webrtc/src/livekit_signaller/imp.rs index 9946c4ce..9f5f05f2 100644 --- a/net/webrtc/src/livekit_signaller/imp.rs +++ b/net/webrtc/src/livekit_signaller/imp.rs @@ -589,9 +589,7 @@ impl SignallableImpl for Signaller { let weak_imp = self.downgrade(); RUNTIME.spawn(async move { - let imp = if let Some(imp) = weak_imp.upgrade() { - imp - } else { + let Some(imp) = weak_imp.upgrade() else { return; }; @@ -634,35 +632,39 @@ impl SignallableImpl for Signaller { } } - let weak_imp = imp.downgrade(); imp.obj().connect_closure( "webrtcbin-ready", false, - glib::closure!(|_signaller: &super::LiveKitSignaller, - _consumer_identifier: &str, - webrtcbin: &gst::Element| { - gst::info!(CAT, "Adding data channels"); - let reliable_channel = webrtcbin.emit_by_name::( - "create-data-channel", - &[ - &"_reliable", - &gst::Structure::builder("config") - .field("ordered", true) - .build(), - ], - ); - let lossy_channel = webrtcbin.emit_by_name::( - "create-data-channel", - &[ - &"_lossy", - &gst::Structure::builder("config") - .field("ordered", true) - .field("max-retransmits", 0) - .build(), - ], - ); + glib::closure!( + #[watch(rename_to = obj)] + imp.obj(), + move |_signaller: &super::LiveKitSignaller, + _consumer_identifier: &str, + webrtcbin: &gst::Element| { + let imp = obj.imp(); + gst::info!(CAT, "Adding data channels"); + let reliable_channel = webrtcbin + .emit_by_name::( + "create-data-channel", + &[ + &"_reliable", + &gst::Structure::builder("config") + .field("ordered", true) + .build(), + ], + ); + let lossy_channel = webrtcbin + .emit_by_name::( + "create-data-channel", + &[ + &"_lossy", + &gst::Structure::builder("config") + .field("ordered", true) + .field("max-retransmits", 0) + .build(), + ], + ); - if let Some(imp) = weak_imp.upgrade() { let mut connection = imp.connection.lock().unwrap(); if let Some(connection) = connection.as_mut() { connection.channels = Some(Channels { @@ -671,7 +673,7 @@ impl SignallableImpl for Signaller { }); } } - }), + ), ); let connection = Connection { diff --git a/net/webrtc/src/signaller/imp.rs b/net/webrtc/src/signaller/imp.rs index 87636c19..dc45a81d 100644 --- a/net/webrtc/src/signaller/imp.rs +++ b/net/webrtc/src/signaller/imp.rs @@ -68,6 +68,7 @@ pub struct Signaller { struct State { /// Sender for the websocket messages websocket_sender: Option>, + connect_task_handle: Option>, send_task_handle: Option>>, receive_task_handle: Option>, producers: HashSet, @@ -173,8 +174,10 @@ impl Signaller { // 1000 is completely arbitrary, we simply don't want infinite piling // up of messages as with unbounded let (websocket_sender, mut websocket_receiver) = mpsc::channel::(1000); - let send_task_handle = - RUNTIME.spawn(glib::clone!(@weak-allow-none self as this => async move { + let send_task_handle = RUNTIME.spawn(glib::clone!( + #[to_owned(rename_to = this)] + self, + async move { let mut res = Ok(()); while let Some(msg) = websocket_receiver.next().await { gst::log!(CAT, "Sending websocket message {:?}", msg); @@ -183,21 +186,18 @@ impl Signaller { .await; if let Err(ref err) = res { - this.as_ref().map_or_else(|| gst::error!(CAT, "Quitting send loop: {err}"), - |this| gst::error!(CAT, imp: this, "Quitting send loop: {err}") - ); + gst::error!(CAT, imp: this, "Quitting send loop: {err}"); break; } } - this.map_or_else(|| gst::debug!(CAT, "Done sending"), - |this| gst::debug!(CAT, imp: this, "Done sending") - ); + gst::debug!(CAT, imp: this, "Done sending"); let _ = ws_sink.close().await; res.map_err(Into::into) - })); + } + )); let obj = self.obj(); let meta = @@ -207,23 +207,20 @@ impl Signaller { None }; - let receive_task_handle = - RUNTIME.spawn(glib::clone!(@weak-allow-none self as this => async move { + let receive_task_handle = RUNTIME.spawn(glib::clone!( + #[to_owned(rename_to = this)] + self, + async move { while let Some(msg) = tokio_stream::StreamExt::next(&mut ws_stream).await { - if let Some(ref this) = this { - if let ControlFlow::Break(_) = this.handle_message(msg, &meta) { - break; - } - } else { + if let ControlFlow::Break(_) = this.handle_message(msg, &meta) { break; } } let msg = "Stopped websocket receiving"; - this.map_or_else(|| gst::info!(CAT, "{msg}"), - |this| gst::info!(CAT, imp: this, "{msg}") - ); - })); + gst::info!(CAT, imp: this, "{msg}"); + } + )); let mut state = self.state.lock().unwrap(); state.websocket_sender = Some(websocket_sender); @@ -297,11 +294,16 @@ impl Signaller { fn send(&self, msg: p::IncomingMessage) { let state = self.state.lock().unwrap(); if let Some(mut sender) = state.websocket_sender.clone() { - RUNTIME.spawn(glib::clone!(@weak self as this => async move { - if let Err(err) = sender.send(msg).await { - this.obj().emit_by_name::<()>("error", &[&format!("Error: {}", err)]); + RUNTIME.spawn(glib::clone!( + #[to_owned(rename_to = this)] + self, + async move { + if let Err(err) = sender.send(msg).await { + this.obj() + .emit_by_name::<()>("error", &[&format!("Error: {}", err)]); + } } - })); + )); } } @@ -636,17 +638,37 @@ impl ObjectImpl for Signaller { impl SignallableImpl for Signaller { fn start(&self) { gst::info!(CAT, imp: self, "Starting"); - RUNTIME.spawn(glib::clone!(@weak self as this => async move { - if let Err(err) = this.connect().await { - this.obj().emit_by_name::<()>("error", &[&format!("Error receiving: {}", err)]); + + let mut state = self.state.lock().unwrap(); + let connect_task_handle = RUNTIME.spawn(glib::clone!( + #[to_owned(rename_to = this)] + self, + async move { + if let Err(err) = this.connect().await { + this.obj() + .emit_by_name::<()>("error", &[&format!("Error receiving: {}", err)]); + } } - })); + )); + + state.connect_task_handle = Some(connect_task_handle); } fn stop(&self) { gst::info!(CAT, imp: self, "Stopping now"); let mut state = self.state.lock().unwrap(); + + // First make sure the connect task is stopped if it is still + // running + let connect_task_handle = state.connect_task_handle.take(); + if let Some(handle) = connect_task_handle { + RUNTIME.block_on(async move { + handle.abort(); + let _ = handle.await; + }); + } + let send_task_handle = state.send_task_handle.take(); let receive_task_handle = state.receive_task_handle.take(); if let Some(mut sender) = state.websocket_sender.take() { @@ -661,6 +683,7 @@ impl SignallableImpl for Signaller { if let Some(handle) = receive_task_handle { handle.abort(); + let _ = handle.await; } }); } @@ -720,16 +743,21 @@ impl SignallableImpl for Signaller { let state = self.state.lock().unwrap(); let session_id = session_id.to_string(); if let Some(mut sender) = state.websocket_sender.clone() { - RUNTIME.spawn(glib::clone!(@weak self as this => async move { - if let Err(err) = sender - .send(p::IncomingMessage::EndSession(p::EndSessionMessage { - session_id, - })) - .await - { - this.obj().emit_by_name::<()>("error", &[&format!("Error: {}", err)]); + RUNTIME.spawn(glib::clone!( + #[to_owned(rename_to = this)] + self, + async move { + if let Err(err) = sender + .send(p::IncomingMessage::EndSession(p::EndSessionMessage { + session_id, + })) + .await + { + this.obj() + .emit_by_name::<()>("error", &[&format!("Error: {}", err)]); + } } - })); + )); } } } diff --git a/net/webrtc/src/webrtcsink/imp.rs b/net/webrtc/src/webrtcsink/imp.rs index 3ae185fe..e00e001b 100644 --- a/net/webrtc/src/webrtcsink/imp.rs +++ b/net/webrtc/src/webrtcsink/imp.rs @@ -140,11 +140,11 @@ impl CustomBusStream { match msg.view() { gst::MessageView::NeedContext(..) | gst::MessageView::HaveContext(..) => { if let Some(bin) = bin_weak.upgrade() { - let _ = bin.post_message(msg.to_owned()); + let _ = bin.post_message(msg.clone()); } } _ => { - let _ = sender.unbounded_send(msg.to_owned()); + let _ = sender.unbounded_send(msg.clone()); } } @@ -364,7 +364,7 @@ impl SessionWrapper { /// # Panics /// /// Panics is the `Session` is already in place. - fn restore(&mut self, session: Session) { + fn restore(&mut self, element: &super::BaseWebRTCSink, session: Session) { let SessionWrapper::Taken(ref cands) = self else { panic!("Session is already in place"); }; @@ -372,6 +372,7 @@ impl SessionWrapper { if !cands.is_empty() { gst::trace!( CAT, + obj: element, "handling {} pending ice candidates for session {}", cands.len(), session.id, @@ -391,16 +392,22 @@ impl SessionWrapper { /// /// If the `Session` is in place, the ICE candidate is added immediately, /// otherwise, it will be added when the `Session` is restored. - fn add_ice_candidate(&mut self, session_id: &str, sdp_m_line_index: u32, candidate: &str) { + fn add_ice_candidate( + &mut self, + element: &super::BaseWebRTCSink, + session_id: &str, + sdp_m_line_index: u32, + candidate: &str, + ) { match self { SessionWrapper::InPlace(session) => { - gst::trace!(CAT, "adding ice candidate for session {session_id}"); + gst::trace!(CAT, obj: element, "adding ice candidate for session {session_id}"); session .webrtcbin .emit_by_name::<()>("add-ice-candidate", &[&sdp_m_line_index, &candidate]); } SessionWrapper::Taken(cands) => { - gst::trace!(CAT, "queuing ice candidate for session {session_id}"); + gst::trace!(CAT, obj: element, "queueing ice candidate for session {session_id}"); cands.push(IceCandidate { sdp_m_line_index, candidate: candidate.to_string(), @@ -453,7 +460,7 @@ fn create_navigation_event(sink: &super::BaseWebRTCSink, msg: &str) { let event = gst::event::Navigation::new(event.event.structure()); if !stream.sink_pad.push_event(event.clone()) { - gst::info!(CAT, "Could not send event: {:?}", event); + gst::info!(CAT, obj: sink, "Could not send event: {:?}", event); } } } @@ -466,13 +473,13 @@ fn create_navigation_event(sink: &super::BaseWebRTCSink, msg: &str) { if stream.sink_pad.name().starts_with("video_") { gst::log!(CAT, "Navigating to: {:?}", event); if !stream.sink_pad.push_event(event.clone()) { - gst::info!(CAT, "Could not send event: {:?}", event); + gst::info!(CAT, obj: sink, "Could not send event: {:?}", event); } } }); } } else { - gst::error!(CAT, "Invalid navigation event: {:?}", msg); + gst::error!(CAT, obj: sink, "Invalid navigation event: {:?}", msg); } } @@ -1113,7 +1120,7 @@ impl VideoEncoder { } impl State { - fn finalize_session(&mut self, session: &mut Session) { + fn finalize_session(&mut self, element: &super::BaseWebRTCSink, session: &mut Session) { gst::info!(CAT, "Ending session {}", session.id); session.pipeline.debug_to_dot_file_with_ts( gst::DebugGraphDetails::all(), @@ -1131,6 +1138,7 @@ impl State { let (sessions, _cvar) = &*finalizing_sessions; sessions.lock().unwrap().insert(session_id.clone()); + let element = element.clone(); let pipeline = session.pipeline.clone(); RUNTIME.spawn_blocking(move || { if let Some(stats_collection_handle) = stats_collection_handle { @@ -1146,14 +1154,18 @@ impl State { sessions.remove(&session_id); cvar.notify_one(); - gst::debug!(CAT, "Session {session_id} ended"); + gst::debug!(CAT, obj: element, "Session {session_id} ended"); }); } - fn end_session(&mut self, session_id: &str) -> Option { + fn end_session( + &mut self, + element: &super::BaseWebRTCSink, + session_id: &str, + ) -> Option { if let Some(session) = self.sessions.remove(session_id) { let mut session = session.into_inner(); - self.finalize_session(&mut session); + self.finalize_session(element, &mut session); Some(session) } else { None @@ -1505,7 +1517,7 @@ impl InputStream { impl NavigationEventHandler { fn new(element: &super::BaseWebRTCSink, webrtcbin: &gst::Element) -> Self { - gst::info!(CAT, "Creating navigation data channel"); + gst::info!(CAT, obj: element, "Creating navigation data channel"); let channel = webrtcbin.emit_by_name::( "create-data-channel", &[ @@ -1516,17 +1528,18 @@ impl NavigationEventHandler { ], ); - let weak_element = element.downgrade(); Self(( - channel.connect("on-message-string", false, move |values| { - if let Some(element) = weak_element.upgrade() { - let _channel = values[0].get::().unwrap(); - let msg = values[1].get::<&str>().unwrap(); - create_navigation_event(&element, msg); - } - - None - }), + channel.connect_closure( + "on-message-string", + false, + glib::closure!( + #[watch] + element, + move |_channel: &WebRTCDataChannel, msg: &str| { + create_navigation_event(element, msg); + } + ), + ), channel, )) } @@ -1617,7 +1630,7 @@ impl BaseWebRTCSink { // GstRTPBasePayload::extensions property is only available since GStreamer 1.24 if !payloader.has_property("extensions", Some(gst::Array::static_type())) { if self.has_connected_payloader_setup_slots() { - gst::warning!(CAT, "'extensions' property is not available: TWCC extension ID will default to 1. \ + gst::warning!(CAT, imp: self, "'extensions' property is not available: TWCC extension ID will default to 1. \ Application code must ensure to pick non-conflicting IDs for any additionally configured extensions. \ Please consider updating GStreamer to 1.24."); } @@ -1734,33 +1747,30 @@ impl BaseWebRTCSink { self.configure_congestion_control(payloader, codec, extension_configuration_type) } - fn generate_ssrc( - element: &super::BaseWebRTCSink, - webrtc_pads: &HashMap, - ) -> u32 { + fn generate_ssrc(&self, webrtc_pads: &HashMap) -> u32 { loop { let ret = fastrand::u32(..); if !webrtc_pads.contains_key(&ret) { - gst::trace!(CAT, obj: element, "Selected ssrc {}", ret); + gst::trace!(CAT, imp: self, "Selected ssrc {}", ret); return ret; } } } fn request_inactive_webrtcbin_pad( - element: &super::BaseWebRTCSink, + &self, webrtcbin: &gst::Element, webrtc_pads: &mut HashMap, is_video: bool, ) { - let ssrc = BaseWebRTCSink::generate_ssrc(element, webrtc_pads); + let ssrc = self.generate_ssrc(webrtc_pads); let media_idx = webrtc_pads.len() as i32; let Some(pad) = webrtcbin.request_pad_simple(&format!("sink_{}", media_idx)) else { - gst::error!(CAT, obj: element, "Failed to request pad from webrtcbin"); - gst::element_error!( - element, + gst::error!(CAT, imp: self, "Failed to request pad from webrtcbin"); + gst::element_imp_error!( + self, gst::StreamError::Failed, ["Failed to request pad from webrtcbin"] ); @@ -1794,7 +1804,7 @@ impl BaseWebRTCSink { } async fn request_webrtcbin_pad( - element: &super::BaseWebRTCSink, + &self, webrtcbin: &gst::Element, stream: &mut InputStream, media: Option<&gst_sdp::SDPMediaRef>, @@ -1802,28 +1812,28 @@ impl BaseWebRTCSink { webrtc_pads: &mut HashMap, codecs: &mut BTreeMap, ) { - let ssrc = BaseWebRTCSink::generate_ssrc(element, webrtc_pads); + let ssrc = self.generate_ssrc(webrtc_pads); let media_idx = webrtc_pads.len() as i32; let mut payloader_caps = match media { Some(media) => { let discovery_info = stream.create_discovery(); - let codec = BaseWebRTCSink::select_codec( - element, - &discovery_info, - media, - &stream.in_caps.as_ref().unwrap().clone(), - &stream.sink_pad.name(), - settings, - ) - .await; + let codec = self + .select_codec( + &discovery_info, + media, + &stream.in_caps.as_ref().unwrap().clone(), + &stream.sink_pad.name(), + settings, + ) + .await; match codec { Some(codec) => { gst::debug!( CAT, - obj: element, + imp: self, "Selected {codec:?} for media {media_idx}" ); @@ -1831,7 +1841,7 @@ impl BaseWebRTCSink { codec.output_filter().unwrap() } None => { - gst::error!(CAT, obj: element, "No codec selected for media {media_idx}"); + gst::error!(CAT, imp: self, "No codec selected for media {media_idx}"); gst::Caps::new_empty() } @@ -1841,22 +1851,20 @@ impl BaseWebRTCSink { }; if payloader_caps.is_empty() { - BaseWebRTCSink::request_inactive_webrtcbin_pad( - element, - webrtcbin, - webrtc_pads, - stream.is_video, - ); + self.request_inactive_webrtcbin_pad(webrtcbin, webrtc_pads, stream.is_video); } else { let payloader_caps_mut = payloader_caps.make_mut(); payloader_caps_mut.set("ssrc", ssrc); - if element.imp().settings.lock().unwrap().do_clock_signalling { + if self.settings.lock().unwrap().do_clock_signalling { // Add RFC7273 attributes when using an NTP or PTP clock - let clock = element.clock().expect("element added and pipeline playing"); + let clock = self + .obj() + .clock() + .expect("element added and pipeline playing"); let ts_refclk = if clock.is::() { - gst::debug!(CAT, obj: element, "Found NTP clock"); + gst::debug!(CAT, imp: self, "Found NTP clock"); let addr = clock.property::("address"); let port = clock.property::("port"); @@ -1867,7 +1875,7 @@ impl BaseWebRTCSink { format!("ntp={addr}:{port}") }) } else if clock.is::() { - gst::debug!(CAT, obj: element, "Found PTP clock"); + gst::debug!(CAT, imp: self, "Found PTP clock"); let clock_id = clock.property::("grandmaster-clock-id"); let domain = clock.property::("domain"); @@ -1905,15 +1913,15 @@ impl BaseWebRTCSink { gst::info!( CAT, - obj: element, + imp: self, "Requesting WebRTC pad with caps {}", payloader_caps ); let Some(pad) = webrtcbin.request_pad_simple(&format!("sink_{}", media_idx)) else { - gst::error!(CAT, obj: element, "Failed to request pad from webrtcbin"); - gst::element_error!( - element, + gst::error!(CAT, imp: self, "Failed to request pad from webrtcbin"); + gst::element_imp_error!( + self, gst::StreamError::Failed, ["Failed to request pad from webrtcbin"] ); @@ -1921,7 +1929,7 @@ impl BaseWebRTCSink { }; if let Some(msid) = stream.msid() { - gst::trace!(CAT, obj: element, "forwarding msid={msid:?} to webrtcbin sinkpad"); + gst::trace!(CAT, imp: self, "forwarding msid={msid:?} to webrtcbin sinkpad"); pad.set_property("msid", &msid); } @@ -1958,23 +1966,23 @@ impl BaseWebRTCSink { /// Prepare for accepting consumers, by setting /// up StreamProducers for each of our sink pads - fn prepare(&self, element: &super::BaseWebRTCSink) -> Result<(), Error> { - gst::debug!(CAT, obj: element, "preparing"); + fn prepare(&self) -> Result<(), Error> { + gst::debug!(CAT, imp: self, "preparing"); self.state .lock() .unwrap() .streams .iter_mut() - .try_for_each(|(_, stream)| stream.prepare(element))?; + .try_for_each(|(_, stream)| stream.prepare(&self.obj()))?; Ok(()) } /// Unprepare by stopping consumers, then the signaller object. /// Might abort codec discovery - fn unprepare(&self, element: &super::BaseWebRTCSink) -> Result<(), Error> { - gst::info!(CAT, obj: element, "unpreparing"); + fn unprepare(&self) -> Result<(), Error> { + gst::info!(CAT, imp: self, "unpreparing"); let settings = self.settings.lock().unwrap(); let signaller = settings.signaller.clone(); @@ -1985,27 +1993,27 @@ impl BaseWebRTCSink { let sessions: Vec<_> = session_ids .iter() - .filter_map(|id| state.end_session(id)) + .filter_map(|id| state.end_session(&self.obj(), id)) .collect(); state .streams .iter_mut() - .for_each(|(_, stream)| stream.unprepare(element)); + .for_each(|(_, stream)| stream.unprepare(&self.obj())); let codecs_abort_handle = std::mem::take(&mut state.codecs_abort_handles); codecs_abort_handle.into_iter().for_each(|handle| { handle.abort(); }); - gst::debug!(CAT, obj: element, "Waiting for codec discoveries to finish"); + gst::debug!(CAT, imp: self, "Waiting for codec discoveries to finish"); let codecs_done_receiver = std::mem::take(&mut state.codecs_done_receivers); codecs_done_receiver.into_iter().for_each(|receiver| { RUNTIME.block_on(async { let _ = receiver.await; }); }); - gst::debug!(CAT, obj: element, "No codec discovery is running anymore"); + gst::debug!(CAT, imp: self, "No codec discovery is running anymore"); state.codec_discovery_done = false; state.codecs = BTreeMap::new(); @@ -2016,16 +2024,16 @@ impl BaseWebRTCSink { } drop(state); - gst::debug!(CAT, obj: element, "Ending sessions"); + gst::debug!(CAT, imp: self, "Ending sessions"); for session in sessions { signaller.end_session(&session.id); } - gst::debug!(CAT, obj: element, "All sessions have started finalizing"); + gst::debug!(CAT, imp: self, "All sessions have started finalizing"); if signaller_state == SignallerState::Started { - gst::info!(CAT, obj: element, "Stopping signaller"); + gst::info!(CAT, imp: self, "Stopping signaller"); signaller.stop(); - gst::info!(CAT, obj: element, "Stopped signaller"); + gst::info!(CAT, imp: self, "Stopped signaller"); } let finalizing_sessions = self.state.lock().unwrap().finalizing_sessions.clone(); @@ -2036,7 +2044,7 @@ impl BaseWebRTCSink { sessions = cvar.wait(sessions).unwrap(); } - gst::debug!(CAT, obj: element, "All sessions are done finalizing"); + gst::debug!(CAT, imp: self, "All sessions are done finalizing"); Ok(()) } @@ -2048,7 +2056,7 @@ impl BaseWebRTCSink { error: signaler.connect_closure( "error", false, - glib::closure!(@watch instance => move |_signaler: glib::Object, error: String| { + glib::closure!(#[watch] instance, move |_signaler: glib::Object, error: String| { gst::element_error!( instance, gst::StreamError::Failed, @@ -2060,7 +2068,7 @@ impl BaseWebRTCSink { request_meta: signaler.connect_closure( "request-meta", false, - glib::closure!(@watch instance => move |_signaler: glib::Object| -> Option { + glib::closure!(#[watch] instance, move |_signaler: glib::Object| -> Option { let meta = instance.imp().settings.lock().unwrap().meta.clone(); meta @@ -2070,9 +2078,9 @@ impl BaseWebRTCSink { session_requested: signaler.connect_closure( "session-requested", false, - glib::closure!(@watch instance => move |_signaler: glib::Object, session_id: &str, peer_id: &str, offer: Option<&gst_webrtc::WebRTCSessionDescription>|{ + glib::closure!(#[watch] instance, move |_signaler: glib::Object, session_id: &str, peer_id: &str, offer: Option<&gst_webrtc::WebRTCSessionDescription>|{ if let Err(err) = instance.imp().start_session(session_id, peer_id, offer) { - gst::warning!(CAT, "{}", err); + gst::warning!(CAT, obj: instance, "{}", err); } }), ), @@ -2080,13 +2088,13 @@ impl BaseWebRTCSink { session_description: signaler.connect_closure( "session-description", false, - glib::closure!(@watch instance => move | + glib::closure!(#[watch] instance, move | _signaler: glib::Object, session_id: &str, session_description: &gst_webrtc::WebRTCSessionDescription| { if session_description.type_() == gst_webrtc::WebRTCSDPType::Answer { - instance.imp().handle_sdp_answer(instance, session_id, session_description); + instance.imp().handle_sdp_answer(session_id, session_description); } else { gst::error!(CAT, obj: instance, "Unsupported SDP Type"); } @@ -2097,7 +2105,7 @@ impl BaseWebRTCSink { handle_ice: signaler.connect_closure( "handle-ice", false, - glib::closure!(@watch instance => move | + glib::closure!(#[watch] instance, move | _signaler: glib::Object, session_id: &str, sdp_m_line_index: u32, @@ -2112,9 +2120,9 @@ impl BaseWebRTCSink { session_ended: signaler.connect_closure( "session-ended", false, - glib::closure!(@watch instance => move |_signaler: glib::Object, session_id: &str|{ - if let Err(err) = instance.imp().remove_session(instance, session_id, false) { - gst::warning!(CAT, "{}", err); + glib::closure!(#[watch] instance, move |_signaler: glib::Object, session_id: &str|{ + if let Err(err) = instance.imp().remove_session(session_id, false) { + gst::warning!(CAT, obj: instance, "{}", err); } false }), @@ -2123,8 +2131,8 @@ impl BaseWebRTCSink { shutdown: signaler.connect_closure( "shutdown", false, - glib::closure!(@watch instance => move |_signaler: glib::Object|{ - instance.imp().shutdown(instance); + glib::closure!(#[watch] instance, move |_signaler: glib::Object|{ + instance.imp().shutdown(); }), ), }); @@ -2142,17 +2150,14 @@ impl BaseWebRTCSink { } /// Called by the signaller when it wants to shut down gracefully - fn shutdown(&self, element: &super::BaseWebRTCSink) { - gst::info!(CAT, "Shutting down"); - let _ = element.post_message(gst::message::Eos::builder().src(element).build()); + fn shutdown(&self) { + gst::info!(CAT, imp: self, "Shutting down"); + let _ = self + .obj() + .post_message(gst::message::Eos::builder().src(&*self.obj()).build()); } - fn on_offer_created( - &self, - _element: &super::BaseWebRTCSink, - offer: gst_webrtc::WebRTCSessionDescription, - session_id: &str, - ) { + fn on_offer_created(&self, offer: gst_webrtc::WebRTCSessionDescription, session_id: &str) { let settings = self.settings.lock().unwrap(); let signaller = settings.signaller.clone(); drop(settings); @@ -2179,12 +2184,7 @@ impl BaseWebRTCSink { } } - fn on_answer_created( - &self, - element: &super::BaseWebRTCSink, - answer: gst_webrtc::WebRTCSessionDescription, - session_id: &str, - ) { + fn on_answer_created(&self, answer: gst_webrtc::WebRTCSessionDescription, session_id: &str) { let settings = self.settings.lock().unwrap(); let signaller = settings.signaller.clone(); drop(settings); @@ -2213,9 +2213,9 @@ impl BaseWebRTCSink { let session_id = session.id.clone(); if let Some(session_wrapper) = state.sessions.get_mut(&session_id) { - session_wrapper.restore(session); + session_wrapper.restore(&self.obj(), session); } else { - gst::warning!(CAT, "Session {session_id} was removed"); + gst::warning!(CAT, imp: self, "Session {session_id} was removed"); } drop(state); @@ -2233,43 +2233,44 @@ impl BaseWebRTCSink { signaller.send_sdp(&session_id, &maybe_munged_answer); - self.on_remote_description_set(element, session_id) + self.on_remote_description_set(session_id) } } - fn on_remote_description_offer_set(&self, element: &super::BaseWebRTCSink, session_id: String) { + fn on_remote_description_offer_set(&self, session_id: String) { let state = self.state.lock().unwrap(); if let Some(session) = state.sessions.get(&session_id) { - let element = element.downgrade(); - gst::debug!(CAT, "Creating answer for session {}", session_id); - let session_id = session_id.clone(); - let promise = gst::Promise::with_change_func(move |reply| { - gst::debug!(CAT, "Created answer for session {}", session_id); + gst::debug!(CAT, imp: self, "Creating answer for session {}", session_id); + let promise = gst::Promise::with_change_func(glib::clone!( + #[weak(rename_to = this)] + self, + #[strong] + session_id, + move |reply| { + gst::debug!(CAT, imp: this, "Created answer for session {}", session_id); - if let Some(element) = element.upgrade() { - let this = element.imp(); let reply = match reply { Ok(Some(reply)) => reply, Ok(None) => { gst::warning!( CAT, - obj: element, + imp: this, "Promise returned without a reply for {}", session_id ); - let _ = this.remove_session(&element, &session_id, true); + let _ = this.remove_session(&session_id, true); return; } Err(err) => { gst::warning!( CAT, - obj: element, + imp: this, "Promise returned with an error for {}: {:?}", session_id, err ); - let _ = this.remove_session(&element, &session_id, true); + let _ = this.remove_session(&session_id, true); return; } }; @@ -2279,18 +2280,19 @@ impl BaseWebRTCSink { .get::() .unwrap() }) { - this.on_answer_created(&element, answer, &session_id); + this.on_answer_created(answer, &session_id); } else { gst::warning!( CAT, + imp: this, "Reply without an answer for session {}: {:?}", session_id, reply ); - let _ = this.remove_session(&element, &session_id, true); + let _ = this.remove_session(&session_id, true); } } - }); + )); session .unwrap() @@ -2300,7 +2302,7 @@ impl BaseWebRTCSink { } async fn select_codec( - element: &super::BaseWebRTCSink, + &self, discovery_info: &DiscoveryInfo, media: &gst_sdp::SDPMediaRef, in_caps: &gst::Caps, @@ -2376,7 +2378,7 @@ impl BaseWebRTCSink { } else { gst::warning!( CAT, - obj: element, + imp: self, "Failed to parse twcc index: {idx_str}" ); } @@ -2394,8 +2396,7 @@ impl BaseWebRTCSink { .map(|twcc_id| ExtensionConfigurationType::Apply { twcc_id }) .unwrap_or(ExtensionConfigurationType::Skip); - BaseWebRTCSink::run_discovery_pipeline( - element, + self.run_discovery_pipeline( stream_name, discovery_info, codec.clone(), @@ -2421,66 +2422,61 @@ impl BaseWebRTCSink { None } - fn negotiate( - &self, - element: &super::BaseWebRTCSink, - session_id: &str, - offer: Option<&gst_webrtc::WebRTCSessionDescription>, - ) { + fn negotiate(&self, session_id: &str, offer: Option<&gst_webrtc::WebRTCSessionDescription>) { let state = self.state.lock().unwrap(); - gst::debug!(CAT, obj: element, "Negotiating for session {}", session_id); + gst::debug!(CAT, imp: self, "Negotiating for session {}", session_id); if let Some(session) = state.sessions.get(session_id) { let session = session.unwrap(); - gst::trace!(CAT, "WebRTC pads: {:?}", session.webrtc_pads); + gst::trace!(CAT, imp: self, "WebRTC pads: {:?}", session.webrtc_pads); if let Some(offer) = offer { - let element = element.downgrade(); - let session_id = session_id.to_string(); - - let promise = gst::Promise::with_change_func(move |reply| { - gst::debug!(CAT, "received reply {:?}", reply); - if let Some(element) = element.upgrade() { - let this = element.imp(); - - this.on_remote_description_offer_set(&element, session_id); + let promise = gst::Promise::with_change_func(glib::clone!( + #[weak(rename_to = this)] + self, + #[to_owned] + session_id, + move |reply| { + gst::debug!(CAT, imp: this, "received reply {:?}", reply); + this.on_remote_description_offer_set(session_id); } - }); + )); session .webrtcbin .emit_by_name::<()>("set-remote-description", &[&offer, &promise]); } else { - let element = element.downgrade(); - gst::debug!(CAT, "Creating offer for session {}", session_id); - let session_id = session_id.to_string(); - let promise = gst::Promise::with_change_func(move |reply| { - gst::debug!(CAT, "Created offer for session {}", session_id); + gst::debug!(CAT, imp: self, "Creating offer for session {}", session_id); + let promise = gst::Promise::with_change_func(glib::clone!( + #[weak(rename_to = this)] + self, + #[to_owned] + session_id, + move |reply| { + gst::debug!(CAT, imp: this, "Created offer for session {}", session_id); - if let Some(element) = element.upgrade() { - let this = element.imp(); let reply = match reply { Ok(Some(reply)) => reply, Ok(None) => { gst::warning!( CAT, - obj: element, + imp: this, "Promise returned without a reply for {}", session_id ); - let _ = this.remove_session(&element, &session_id, true); + let _ = this.remove_session(&session_id, true); return; } Err(err) => { gst::warning!( CAT, - obj: element, + imp: this, "Promise returned with an error for {}: {:?}", session_id, err ); - let _ = this.remove_session(&element, &session_id, true); + let _ = this.remove_session(&session_id, true); return; } }; @@ -2488,7 +2484,7 @@ impl BaseWebRTCSink { if let Ok(offer) = reply.value("offer").map(|offer| { offer.get::().unwrap() }) { - this.on_offer_created(&element, offer, &session_id); + this.on_offer_created(offer, &session_id); } else { gst::warning!( CAT, @@ -2496,10 +2492,10 @@ impl BaseWebRTCSink { session_id, reply ); - let _ = this.remove_session(&element, &session_id, true); + let _ = this.remove_session(&session_id, true); } } - }); + )); session .webrtcbin @@ -2508,7 +2504,7 @@ impl BaseWebRTCSink { } else { gst::debug!( CAT, - obj: element, + imp: self, "consumer for session {} no longer exists (sessions: {:?}", session_id, state.sessions.keys() @@ -2516,13 +2512,7 @@ impl BaseWebRTCSink { } } - fn on_ice_candidate( - &self, - _element: &super::BaseWebRTCSink, - session_id: String, - sdp_m_line_index: u32, - candidate: String, - ) { + fn on_ice_candidate(&self, session_id: String, sdp_m_line_index: u32, candidate: String) { let settings = self.settings.lock().unwrap(); let signaller = settings.signaller.clone(); drop(settings); @@ -2595,9 +2585,14 @@ impl BaseWebRTCSink { webrtcbin.connect_closure( "request-aux-sender", false, - glib::closure!(@watch element, @strong session_id, @weak-allow-none cc - => move |_webrtcbin: gst::Element, _transport: gst::Object| { - if let Some(ref cc) = cc { + glib::closure!( + #[watch] + element, + #[strong] + session_id, + #[strong] + cc, + move |_webrtcbin: gst::Element, _transport: gst::Object| { let settings = element.imp().settings.lock().unwrap(); // TODO: Bind properties with @element's @@ -2607,17 +2602,25 @@ impl BaseWebRTCSink { ("max-bitrate", &settings.cc_info.max_bitrate), ]); - cc.connect_notify(Some("estimated-bitrate"), - glib::clone!(@weak element, @strong session_id - => move |bwe, pspec| { - element.imp().set_bitrate(&element, &session_id, - bwe.property::(pspec.name())); - } - )); - } + cc.connect_notify( + Some("estimated-bitrate"), + glib::clone!( + #[weak] + element, + #[strong] + session_id, + move |bwe, pspec| { + element.imp().set_bitrate( + &session_id, + bwe.property::(pspec.name()), + ); + } + ), + ); - cc - }), + cc.clone() + } + ), ); Some(cc) @@ -2627,18 +2630,25 @@ impl BaseWebRTCSink { webrtcbin.connect_closure( "deep-element-added", false, - glib::closure!(@watch element, @strong session_id - => move |_webrtcbin: gst::Element, _bin: gst::Bin, e: gst::Element| { - - if e.factory().is_some_and(|f| f.name() == "rtprtxsend") { - if e.has_property("stuffing-kbps", Some(i32::static_type())) { - element.imp().set_rtptrxsend(element, &session_id, e); - } else { - gst::warning!(CAT, "rtprtxsend doesn't have a `stuffing-kbps` \ - property, stuffing disabled"); + glib::closure!( + #[watch] + element, + #[strong] + session_id, + move |_webrtcbin: gst::Element, _bin: gst::Bin, e: gst::Element| { + if e.factory().map_or(false, |f| f.name() == "rtprtxsend") { + if e.has_property("stuffing-kbps", Some(i32::static_type())) { + element.imp().set_rtptrxsend(&session_id, e); + } else { + gst::warning!( + CAT, + "rtprtxsend doesn't have a `stuffing-kbps` \ + property, stuffing disabled" + ); + } } } - }), + ), ); rtpgccbwe @@ -2648,125 +2658,141 @@ impl BaseWebRTCSink { pipeline.add(&webrtcbin).unwrap(); - let element_clone = element.downgrade(); - let session_id_clone = session_id.clone(); - webrtcbin.connect("on-ice-candidate", false, move |values| { - if let Some(element) = element_clone.upgrade() { - let this = element.imp(); - let sdp_m_line_index = values[1].get::().expect("Invalid argument"); - let candidate = values[2].get::().expect("Invalid argument"); - this.on_ice_candidate( - &element, - session_id_clone.to_string(), - sdp_m_line_index, - candidate, - ); - } - None - }); + webrtcbin.connect_closure( + "on-ice-candidate", + false, + glib::closure!( + #[watch] + element, + #[strong] + session_id, + move |_webrtcbin: &gst::Element, sdp_m_line_index: u32, candidate: String| { + let this = element.imp(); + this.on_ice_candidate(session_id.to_string(), sdp_m_line_index, candidate); + } + ), + ); - let element_clone = element.downgrade(); - let peer_id_clone = peer_id.clone(); - let session_id_clone = session_id.clone(); - webrtcbin.connect_notify(Some("connection-state"), move |webrtcbin, _pspec| { - if let Some(element) = element_clone.upgrade() { - let state = - webrtcbin.property::("connection-state"); + webrtcbin.connect_notify( + Some("connection-state"), + glib::clone!( + #[weak] + element, + #[strong] + peer_id, + #[strong] + session_id, + move |webrtcbin, _pspec| { + let state = webrtcbin + .property::("connection-state"); - match state { - gst_webrtc::WebRTCPeerConnectionState::Failed => { - let this = element.imp(); - gst::warning!( - CAT, - obj: element, - "Connection state for in session {} (peer {}) failed", - session_id_clone, - peer_id_clone - ); - let _ = this.remove_session(&element, &session_id_clone, true); - } - _ => { - gst::log!( - CAT, - obj: element, - "Connection state in session {} (peer {}) changed: {:?}", - session_id_clone, - peer_id_clone, - state - ); + match state { + gst_webrtc::WebRTCPeerConnectionState::Failed => { + let this = element.imp(); + gst::warning!( + CAT, + obj: element, + "Connection state for in session {} (peer {}) failed", + session_id, + peer_id + ); + let _ = this.remove_session(&session_id, true); + } + _ => { + gst::log!( + CAT, + obj: element, + "Connection state in session {} (peer {}) changed: {:?}", + session_id, + peer_id, + state + ); + } } } - } - }); + ), + ); - let element_clone = element.downgrade(); - let peer_id_clone = peer_id.clone(); - let session_id_clone = session_id.clone(); - webrtcbin.connect_notify(Some("ice-connection-state"), move |webrtcbin, _pspec| { - if let Some(element) = element_clone.upgrade() { - let state = webrtcbin - .property::("ice-connection-state"); - let this = element.imp(); + webrtcbin.connect_notify( + Some("ice-connection-state"), + glib::clone!( + #[weak] + element, + #[strong] + peer_id, + #[strong] + session_id, + move |webrtcbin, _pspec| { + let state = webrtcbin + .property::("ice-connection-state"); + let this = element.imp(); - match state { - gst_webrtc::WebRTCICEConnectionState::Failed => { - gst::warning!( - CAT, - obj: element, - "Ice connection state in session {} (peer {}) failed", - session_id_clone, - peer_id_clone, - ); - let _ = this.remove_session(&element, &session_id_clone, true); + match state { + gst_webrtc::WebRTCICEConnectionState::Failed => { + gst::warning!( + CAT, + obj: element, + "Ice connection state in session {} (peer {}) failed", + session_id, + peer_id, + ); + let _ = this.remove_session(&session_id, true); + } + _ => { + gst::log!( + CAT, + obj: element, + "Ice connection state in session {} (peer {}) changed: {:?}", + session_id, + peer_id, + state + ); + } } - _ => { - gst::log!( - CAT, - obj: element, - "Ice connection state in session {} (peer {}) changed: {:?}", - session_id_clone, - peer_id_clone, - state - ); - } - } - if state == gst_webrtc::WebRTCICEConnectionState::Completed { - let state = this.state.lock().unwrap(); + if state == gst_webrtc::WebRTCICEConnectionState::Completed { + let state = this.state.lock().unwrap(); - if let Some(session) = state.sessions.get(&session_id_clone) { - for webrtc_pad in session.unwrap().webrtc_pads.values() { - if let Some(srcpad) = webrtc_pad.pad.peer() { - srcpad.send_event( - gst_video::UpstreamForceKeyUnitEvent::builder() - .all_headers(true) - .build(), - ); + if let Some(session) = state.sessions.get(&session_id) { + for webrtc_pad in session.unwrap().webrtc_pads.values() { + if let Some(srcpad) = webrtc_pad.pad.peer() { + srcpad.send_event( + gst_video::UpstreamForceKeyUnitEvent::builder() + .all_headers(true) + .build(), + ); + } } } } } - } - }); + ), + ); - let element_clone = element.downgrade(); - let peer_id_clone = peer_id.clone(); - let session_id_clone = session_id.clone(); - webrtcbin.connect_notify(Some("ice-gathering-state"), move |webrtcbin, _pspec| { - let state = - webrtcbin.property::("ice-gathering-state"); + webrtcbin.connect_notify( + Some("ice-gathering-state"), + glib::clone!( + #[weak] + element, + #[strong] + peer_id, + #[strong] + session_id, + move |webrtcbin, _pspec| { + let state = webrtcbin + .property::("ice-gathering-state"); - if let Some(element) = element_clone.upgrade() { - gst::log!( - CAT, - obj: element, - "Ice gathering state in session {} (peer {}) changed: {:?}", - session_id_clone, - peer_id_clone, - state - ); - } - }); + gst::log!( + CAT, + obj: element, + "Ice gathering state in session {} (peer {}) changed: {:?}", + session_id, + peer_id, + state + ); + } + ), + ); let session = Session::new( session_id.clone(), @@ -2793,29 +2819,39 @@ impl BaseWebRTCSink { if session.congestion_controller.is_some() { let session_id_str = session_id.to_string(); - rtpbin.connect_closure("on-new-ssrc", true, - glib::closure!(@weak-allow-none element, - => move |rtpbin: gst::Object, session_id: u32, _src: u32| { - let rtp_session = rtpbin.emit_by_name::("get-session", &[&session_id]); + rtpbin.connect_closure( + "on-new-ssrc", + true, + glib::closure!( + #[watch] + element, + move |rtpbin: gst::Object, session_id: u32, _src: u32| { + let rtp_session = + rtpbin.emit_by_name::("get-session", &[&session_id]); - let element = element.expect("on-new-ssrc emitted when webrtcsink has been disposed?"); let mut state = element.imp().state.lock().unwrap(); if let Some(session) = state.sessions.get_mut(&session_id_str) { let session = session.unwrap_mut(); if session.stats_sigid.is_none() { let session_id_str = session_id_str.clone(); - let element = element.downgrade(); - session.stats_sigid = Some(rtp_session.connect_notify(Some("twcc-stats"), - move |sess, pspec| { - if let Some(element) = element.upgrade() { + session.stats_sigid = Some(rtp_session.connect_notify( + Some("twcc-stats"), + glib::clone!( + #[weak] + element, + move |sess, pspec| { // Run the Loss-based control algorithm on new peer TWCC feedbacks - element.imp().process_loss_stats(&element, &session_id_str, &sess.property::(pspec.name())); + element.imp().process_loss_stats( + &session_id_str, + &sess.property::(pspec.name()), + ); } - } + ), )); } } - }), + } + ), ); } @@ -2849,7 +2885,7 @@ impl BaseWebRTCSink { err.error(), err.debug() ); - let _ = this.remove_session(&element, &session_id_clone, true); + let _ = this.remove_session(&session_id_clone, true); } gst::MessageView::StateChanged(state_changed) => { if state_changed.src() == Some(pipeline.upcast_ref()) { @@ -2874,7 +2910,7 @@ impl BaseWebRTCSink { "Unexpected end of stream in session {}", session_id_clone, ); - let _ = this.remove_session(&element, &session_id_clone, true); + let _ = this.remove_session(&session_id_clone, true); } _ => (), } @@ -2889,19 +2925,19 @@ impl BaseWebRTCSink { streams.sort_by_key(|s| s.serial); - let element_clone = element.downgrade(); - let offer_clone = offer.cloned(); - RUNTIME.spawn(async move { - if let Some(element) = element_clone.upgrade() { - let this = element.imp(); - + RUNTIME.spawn(glib::clone!( + #[to_owned(rename_to = this)] + self, + #[strong(rename_to = offer)] + offer.cloned(), + async move { let settings_clone = this.settings.lock().unwrap().clone(); let signaller = settings_clone.signaller.clone(); let mut webrtc_pads: HashMap = HashMap::new(); let mut codecs: BTreeMap = BTreeMap::new(); - if let Some(ref offer) = offer_clone { + if let Some(ref offer) = offer { for media in offer.sdp().medias() { let media_is_video = match media.media() { Some("audio") => false, @@ -2923,8 +2959,7 @@ impl BaseWebRTCSink { media_is_video == stream_is_video }) { let mut stream = streams.remove(idx); - BaseWebRTCSink::request_webrtcbin_pad( - &element, + this.request_webrtcbin_pad( &webrtcbin, &mut stream, Some(media), @@ -2934,8 +2969,7 @@ impl BaseWebRTCSink { ) .await; } else { - BaseWebRTCSink::request_inactive_webrtcbin_pad( - &element, + this.request_inactive_webrtcbin_pad( &webrtcbin, &mut webrtc_pads, media_is_video, @@ -2944,8 +2978,7 @@ impl BaseWebRTCSink { } } else { for mut stream in streams { - BaseWebRTCSink::request_webrtcbin_pad( - &element, + this.request_webrtcbin_pad( &webrtcbin, &mut stream, None, @@ -2966,7 +2999,7 @@ impl BaseWebRTCSink { if let Some(session) = state.sessions.get_mut(&session_id) { let session = session.unwrap_mut(); session.webrtc_pads = webrtc_pads; - if offer_clone.is_some() { + if offer.is_some() { session.codecs = Some(codecs); } } @@ -2979,7 +3012,7 @@ impl BaseWebRTCSink { "Failed to bring {peer_id} pipeline to READY: {}", err ); - let _ = this.remove_session(&element, &session_id, true); + let _ = this.remove_session(&session_id, true); return; } @@ -3005,7 +3038,7 @@ impl BaseWebRTCSink { // // This is completely safe, as we know that by now all conditions are gathered: // webrtcbin is in the Ready state, and all its transceivers have codec_preferences. - this.negotiate(&element, &session_id, offer_clone.as_ref()); + this.negotiate(&session_id, offer.as_ref()); if let Err(err) = pipeline.set_state(gst::State::Playing) { gst::warning!( @@ -3014,21 +3047,16 @@ impl BaseWebRTCSink { "Failed to bring {peer_id} pipeline to PLAYING: {}", err ); - let _ = this.remove_session(&element, &session_id, true); + let _ = this.remove_session(&session_id, true); } } - }); + )); Ok(()) } /// Called by the signaller to remove a consumer - fn remove_session( - &self, - element: &super::BaseWebRTCSink, - session_id: &str, - signal: bool, - ) -> Result<(), WebRTCSinkError> { + fn remove_session(&self, session_id: &str, signal: bool) -> Result<(), WebRTCSinkError> { let settings = self.settings.lock().unwrap(); let signaller = settings.signaller.clone(); drop(settings); @@ -3038,69 +3066,63 @@ impl BaseWebRTCSink { return Err(WebRTCSinkError::NoSessionWithId(session_id.to_string())); } - if let Some(session) = state.end_session(session_id) { + if let Some(session) = state.end_session(&self.obj(), session_id) { drop(state); signaller .emit_by_name::<()>("consumer-removed", &[&session.peer_id, &session.webrtcbin]); if signal { signaller.end_session(session_id); } - element.emit_by_name::<()>("consumer-removed", &[&session.peer_id, &session.webrtcbin]); + self.obj() + .emit_by_name::<()>("consumer-removed", &[&session.peer_id, &session.webrtcbin]); } Ok(()) } - fn process_loss_stats( - &self, - element: &super::BaseWebRTCSink, - session_id: &str, - stats: &gst::Structure, - ) { - let mut state = element.imp().state.lock().unwrap(); + fn process_loss_stats(&self, session_id: &str, stats: &gst::Structure) { + let mut state = self.state.lock().unwrap(); if let Some(session) = state.sessions.get_mut(session_id) { let session = session.unwrap_mut(); if let Some(congestion_controller) = session.congestion_controller.as_mut() { - congestion_controller.loss_control(element, stats, &mut session.encoders); + congestion_controller.loss_control(&self.obj(), stats, &mut session.encoders); } stats.clone_into(&mut session.stats); } } - fn process_stats( - &self, - element: &super::BaseWebRTCSink, - webrtcbin: gst::Element, - session_id: &str, - ) { + fn process_stats(&self, webrtcbin: gst::Element, session_id: &str) { let session_id = session_id.to_string(); - let promise = gst::Promise::with_change_func( - glib::clone!(@strong session_id, @weak element => move |reply| { + let promise = gst::Promise::with_change_func(glib::clone!( + #[weak(rename_to = this)] + self, + #[strong] + session_id, + move |reply| { if let Ok(Some(stats)) = reply { - - let mut state = element.imp().state.lock().unwrap(); + let mut state = this.state.lock().unwrap(); if let Some(session) = state.sessions.get_mut(&session_id) { let session = session.unwrap_mut(); - if let Some(congestion_controller) = session.congestion_controller.as_mut() { - congestion_controller.delay_control(&element, stats, &mut session.encoders,); + if let Some(congestion_controller) = session.congestion_controller.as_mut() + { + congestion_controller.delay_control( + &this.obj(), + stats, + &mut session.encoders, + ); } session.stats = stats.to_owned(); } } - }), - ); + } + )); webrtcbin.emit_by_name::<()>("get-stats", &[&None::, &promise]); } #[cfg(feature = "v1_22")] - fn set_rtptrxsend( - &self, - element: &super::BaseWebRTCSink, - session_id: &str, - rtprtxsend: gst::Element, - ) { - let mut state = element.imp().state.lock().unwrap(); + fn set_rtptrxsend(&self, session_id: &str, rtprtxsend: gst::Element) { + let mut state = self.state.lock().unwrap(); if let Some(session) = state.sessions.get_mut(session_id) { session.unwrap_mut().rtprtxsend = Some(rtprtxsend); @@ -3108,9 +3130,9 @@ impl BaseWebRTCSink { } #[cfg(feature = "v1_22")] - fn set_bitrate(&self, element: &super::BaseWebRTCSink, session_id: &str, bitrate: u32) { - let settings = element.imp().settings.lock().unwrap(); - let mut state = element.imp().state.lock().unwrap(); + fn set_bitrate(&self, session_id: &str, bitrate: u32) { + let settings = self.settings.lock().unwrap(); + let mut state = self.state.lock().unwrap(); if let Some(session) = state.sessions.get_mut(session_id) { let session = session.unwrap_mut(); @@ -3136,7 +3158,7 @@ impl BaseWebRTCSink { } for encoder in session.encoders.iter_mut() { - if encoder.set_bitrate(element, encoders_bitrate).is_ok() { + if encoder.set_bitrate(&self.obj(), encoders_bitrate).is_ok() { encoder .transceiver .set_property("fec-percentage", (fec_percentage as u32).min(100)); @@ -3145,7 +3167,7 @@ impl BaseWebRTCSink { } } - fn on_remote_description_set(&self, element: &super::BaseWebRTCSink, session_id: String) { + fn on_remote_description_set(&self, session_id: String) { let mut state = self.state.lock().unwrap(); let mut remove = false; let codecs = state.codecs.clone(); @@ -3173,11 +3195,11 @@ impl BaseWebRTCSink { { drop(state); if let Err(err) = - session.connect_input_stream(element, &producer, webrtc_pad, &codecs) + session.connect_input_stream(&self.obj(), &producer, webrtc_pad, &codecs) { gst::error!( CAT, - obj: element, + imp: self, "Failed to connect input stream {} for session {}: {}", stream_name, session_id, @@ -3191,7 +3213,7 @@ impl BaseWebRTCSink { } else { gst::error!( CAT, - obj: element, + imp: self, "No producer to connect session {} to", session_id, ); @@ -3205,7 +3227,7 @@ impl BaseWebRTCSink { format!("webrtcsink-peer-{session_id}-remote-description-set",), ); - let element_clone = element.downgrade(); + let this_weak = self.downgrade(); let webrtcbin = session.webrtcbin.downgrade(); let session_id_clone = session_id.clone(); session.stats_collection_handle = Some(RUNTIME.spawn(async move { @@ -3213,13 +3235,10 @@ impl BaseWebRTCSink { loop { interval.tick().await; - let element_clone = element_clone.clone(); - if let (Some(webrtcbin), Some(element)) = - (webrtcbin.upgrade(), element_clone.upgrade()) + if let (Some(webrtcbin), Some(this)) = + (webrtcbin.upgrade(), this_weak.upgrade()) { - element - .imp() - .process_stats(&element, webrtcbin, &session_id_clone); + this.process_stats(webrtcbin, &session_id_clone); } else { break; } @@ -3228,16 +3247,16 @@ impl BaseWebRTCSink { if remove { let _ = state.sessions.remove(&session_id); - state.finalize_session(&mut session); + state.finalize_session(&self.obj(), &mut session); drop(state); let settings = self.settings.lock().unwrap(); let signaller = settings.signaller.clone(); drop(settings); signaller.end_session(&session_id); } else if let Some(session_wrapper) = state.sessions.get_mut(&session_id) { - session_wrapper.restore(session); + session_wrapper.restore(&self.obj(), session); } else { - gst::warning!(CAT, "Session {session_id} was removed"); + gst::warning!(CAT, imp: self, "Session {session_id} was removed"); } } } @@ -3255,24 +3274,19 @@ impl BaseWebRTCSink { let sdp_m_line_index = match sdp_m_line_index { Some(sdp_m_line_index) => sdp_m_line_index, None => { - gst::warning!(CAT, "No mandatory SDP m-line index"); + gst::warning!(CAT, imp: self, "No mandatory SDP m-line index"); return; } }; if let Some(session_wrapper) = state.sessions.get_mut(session_id) { - session_wrapper.add_ice_candidate(session_id, sdp_m_line_index, candidate); + session_wrapper.add_ice_candidate(&self.obj(), session_id, sdp_m_line_index, candidate); } else { - gst::warning!(CAT, "No consumer with ID {session_id}"); + gst::warning!(CAT, imp: self, "No consumer with ID {session_id}"); } } - fn handle_sdp_answer( - &self, - element: &super::BaseWebRTCSink, - session_id: &str, - desc: &gst_webrtc::WebRTCSessionDescription, - ) { + fn handle_sdp_answer(&self, session_id: &str, desc: &gst_webrtc::WebRTCSessionDescription) { let mut state = self.state.lock().unwrap(); if let Some(session) = state.sessions.get_mut(session_id) { @@ -3294,12 +3308,13 @@ impl BaseWebRTCSink { gst::warning!( CAT, + imp: self, "consumer from session {} refused media {}: {:?}", session_id, media_idx, media_str ); - if let Some(_session) = state.end_session(session_id) { + if let Some(_session) = state.end_session(&self.obj(), session_id) { drop(state); let settings = self.settings.lock().unwrap(); let signaller = settings.signaller.clone(); @@ -3309,7 +3324,7 @@ impl BaseWebRTCSink { gst::warning!( CAT, - obj: element, + imp: self, "Consumer refused media {session_id}, {media_idx}" ); return; @@ -3325,13 +3340,14 @@ impl BaseWebRTCSink { } else { gst::warning!( CAT, + imp: self, "consumer from session {} did not provide valid payload for media index {} for session {}", session_id, media_idx, session_id, ); - if let Some(_session) = state.end_session(session_id) { + if let Some(_session) = state.end_session(&self.obj(), session_id) { drop(state); let settings = self.settings.lock().unwrap(); let signaller = settings.signaller.clone(); @@ -3339,33 +3355,32 @@ impl BaseWebRTCSink { signaller.end_session(session_id); } - gst::warning!(CAT, obj: element, "Consumer did not provide valid payload for media session: {session_id} media_ix: {media_idx}"); + gst::warning!(CAT, imp: self, "Consumer did not provide valid payload for media session: {session_id} media_ix: {media_idx}"); return; } } - let element = element.downgrade(); - let session_id = session_id.to_string(); - - let promise = gst::Promise::with_change_func(move |reply| { - gst::debug!(CAT, "received reply {:?}", reply); - if let Some(element) = element.upgrade() { - let this = element.imp(); - - this.on_remote_description_set(&element, session_id); + let promise = gst::Promise::with_change_func(glib::clone!( + #[weak(rename_to = this)] + self, + #[to_owned] + session_id, + move |reply| { + gst::debug!(CAT, imp: this, "received reply {:?}", reply); + this.on_remote_description_set(session_id); } - }); + )); session .webrtcbin .emit_by_name::<()>("set-remote-description", &[desc, &promise]); } else { - gst::warning!(CAT, "No consumer with ID {session_id}"); + gst::warning!(CAT, imp: self, "No consumer with ID {session_id}"); } } async fn run_discovery_pipeline( - element: &super::BaseWebRTCSink, + &self, stream_name: &str, discovery_info: &DiscoveryInfo, codec: Codec, @@ -3391,11 +3406,11 @@ impl BaseWebRTCSink { gst::debug!( CAT, - obj: element, + imp: self, "Running discovery pipeline for input caps {input_caps} and output caps {output_caps} with codec {codec:?}" ); - gst::debug!(CAT, obj: element, "Running discovery pipeline"); + gst::debug!(CAT, imp: self, "Running discovery pipeline"); let elements_slice = &elements.iter().collect::>(); pipe.0.add_many(elements_slice).unwrap(); gst::Element::link_many(elements_slice) @@ -3406,7 +3421,7 @@ impl BaseWebRTCSink { .expect("Caps should always be set when starting discovery"), output_caps, &codec, - element.emit_by_name::>( + self.obj().emit_by_name::>( "request-encoded-filter", &[&Option::::None, &stream_name, &codec.caps], ), @@ -3418,13 +3433,13 @@ impl BaseWebRTCSink { } = payload_chain_builder.build(&pipe.0, &encoding_chain_src)?; if let Some(ref enc) = encoding_chain.encoder { - element.emit_by_name::( + self.obj().emit_by_name::( "encoder-setup", &[&"discovery".to_string(), &stream_name, &enc], ); } - element.imp().configure_payloader( + self.configure_payloader( "discovery", stream_name, &payloader, @@ -3462,14 +3477,14 @@ impl BaseWebRTCSink { .with_context(|| format!("Running discovery pipeline for caps {input_caps}"))?; let bus = pipe.0.bus().unwrap(); - let mut stream = CustomBusStream::new(element, &bus); + let mut stream = CustomBusStream::new(&self.obj(), &bus); pipe.0 .set_state(gst::State::Playing) .with_context(|| format!("Running discovery pipeline for caps {input_caps}"))?; { - let mut state = element.imp().state.lock().unwrap(); + let mut state = self.state.lock().unwrap(); state.queue_discovery(stream_name, discovery_info.clone()); } @@ -3478,7 +3493,7 @@ impl BaseWebRTCSink { if let Some(msg) = stream.next().await { match msg.view() { gst::MessageView::Error(err) => { - gst::warning!(CAT, "Error in discovery pipeline: {err:#?}"); + gst::warning!(CAT, imp: self, "Error in discovery pipeline: {err:#?}"); pipe.0.debug_to_dot_file_with_ts( gst::DebugGraphDetails::all(), "webrtcsink-discovery-error", @@ -3511,7 +3526,7 @@ impl BaseWebRTCSink { _ => continue, }; - gst::info!(CAT, "Discovery pipeline got caps {caps:?}"); + gst::info!(CAT, imp: self, "Discovery pipeline got caps {caps:?}"); pipe.0.debug_to_dot_file_with_ts( gst::DebugGraphDetails::all(), format!("webrtcsink-discovery-{}-done", pipe.0.name()), @@ -3529,7 +3544,7 @@ impl BaseWebRTCSink { s.set("payload", codec.payload().unwrap()); gst::debug!( CAT, - obj: element, + imp: self, "Codec discovery pipeline for caps {input_caps} with codec {codec:?} succeeded: {s}" ); break Ok(s); @@ -3547,14 +3562,14 @@ impl BaseWebRTCSink { } }; - let mut state = element.imp().state.lock().unwrap(); + let mut state = self.state.lock().unwrap(); state.remove_discovery(stream_name, discovery_info); ret } async fn lookup_caps( - element: &super::BaseWebRTCSink, + &self, discovery_info: DiscoveryInfo, name: String, output_caps: gst::Caps, @@ -3565,15 +3580,14 @@ impl BaseWebRTCSink { gst::info!( CAT, - obj: element, + imp: self, "Stream is already encoded with codec {}, still need to payload it", codec.name ); caps = cleanup_codec_caps(caps); - vec![BaseWebRTCSink::run_discovery_pipeline( - element, + vec![self.run_discovery_pipeline( &name, &discovery_info, codec, @@ -3603,8 +3617,7 @@ impl BaseWebRTCSink { .iter() .filter(|codec| codec.is_video() == is_video) .map(|codec| { - BaseWebRTCSink::run_discovery_pipeline( - element, + self.run_discovery_pipeline( &name, &discovery_info, codec.clone(), @@ -3630,7 +3643,7 @@ impl BaseWebRTCSink { */ gst::warning!( CAT, - obj: element, + imp: self, "Codec discovery pipeline failed: {}", err ); @@ -3638,7 +3651,7 @@ impl BaseWebRTCSink { } } - let mut state = element.imp().state.lock().unwrap(); + let mut state = self.state.lock().unwrap(); if let Some(stream) = state.streams.get_mut(&name) { stream.out_caps = Some(payloader_caps.clone()); } @@ -3810,55 +3823,60 @@ impl BaseWebRTCSink { }; let stream_name_clone = stream_name.to_owned(); - RUNTIME.spawn(glib::clone!(@weak self as this, @strong discovery_info => async move { - let element = &*this.obj(); - let (fut, handle) = futures::future::abortable( - Self::lookup_caps( - element, + RUNTIME.spawn(glib::clone!( + #[to_owned(rename_to = this)] + self, + #[strong] + discovery_info, + async move { + let (fut, handle) = futures::future::abortable(this.lookup_caps( discovery_info.clone(), stream_name_clone.clone(), gst::Caps::new_any(), &codecs, )); - let (codecs_done_sender, codecs_done_receiver) = - futures::channel::oneshot::channel(); + let (codecs_done_sender, codecs_done_receiver) = + futures::channel::oneshot::channel(); - // Compiler isn't budged by dropping state before await, - // so let's make a new scope instead. - { - let mut state = this.state.lock().unwrap(); - state.codecs_abort_handles.push(handle); - state.codecs_done_receivers.push(codecs_done_receiver); - } - - match fut.await { - Ok(Err(err)) => { - gst::error!(CAT, imp: this, "Error running discovery: {err:?}"); - gst::element_error!( - this.obj(), - gst::StreamError::CodecNotFound, - ["Failed to look up output caps: {err:?}"] - ); - } - Ok(Ok(_)) => { - let settings = this.settings.lock().unwrap(); + // Compiler isn't budged by dropping state before await, + // so let's make a new scope instead. + { let mut state = this.state.lock().unwrap(); - state.codec_discovery_done = state.streams.values().all(|stream| stream.out_caps.is_some()); - let signaller = settings.signaller.clone(); - drop(settings); - if state.should_start_signaller(element) { - state.signaller_state = SignallerState::Started; - drop(state); - signaller.start(); - } + state.codecs_abort_handles.push(handle); + state.codecs_done_receivers.push(codecs_done_receiver); } - _ => (), + + match fut.await { + Ok(Err(err)) => { + gst::error!(CAT, imp: this, "Error running discovery: {err:?}"); + gst::element_error!( + this.obj(), + gst::StreamError::CodecNotFound, + ["Failed to look up output caps: {err:?}"] + ); + } + Ok(Ok(_)) => { + let settings = this.settings.lock().unwrap(); + let mut state = this.state.lock().unwrap(); + state.codec_discovery_done = state + .streams + .values() + .all(|stream| stream.out_caps.is_some()); + let signaller = settings.signaller.clone(); + drop(settings); + if state.should_start_signaller(&this.obj()) { + state.signaller_state = SignallerState::Started; + drop(state); + signaller.start(); + } + } + _ => (), + } + + let _ = codecs_done_sender.send(()); } - - - let _ = codecs_done_sender.send(()); - })); + )); } fn chain( @@ -4384,7 +4402,7 @@ impl ElementImpl for BaseWebRTCSink { ) -> Option { let element = self.obj(); if element.current_state() > gst::State::Ready { - gst::error!(CAT, "element pads can only be requested before starting"); + gst::error!(CAT, imp: self, "element pads can only be requested before starting"); return None; } @@ -4450,7 +4468,7 @@ impl ElementImpl for BaseWebRTCSink { ) -> Result { let element = self.obj(); if let gst::StateChange::ReadyToPaused = transition { - if let Err(err) = self.prepare(&element) { + if let Err(err) = self.prepare() { gst::element_error!( element, gst::StreamError::Failed, @@ -4477,11 +4495,11 @@ impl ElementImpl for BaseWebRTCSink { ); let (tx, rx) = mpsc::channel(); element.call_async(move |element| { - tx.send(element.imp().unprepare(element)).unwrap(); + tx.send(element.imp().unprepare()).unwrap(); }); rx.recv().unwrap() } - Err(_) => self.unprepare(&element), + Err(_) => self.unprepare(), }; if let Err(err) = unprepare_res { @@ -4543,7 +4561,7 @@ impl NavigationImpl for BaseWebRTCSink { gst::log!(CAT, "Navigating to: {:?}", event); // FIXME: Handle multi tracks. if !stream.sink_pad.push_event(event.clone()) { - gst::info!(CAT, "Could not send event: {:?}", event); + gst::info!(CAT, imp: self, "Could not send event: {:?}", event); } } }); diff --git a/net/webrtc/src/webrtcsrc/imp.rs b/net/webrtc/src/webrtcsrc/imp.rs index 859b80f4..c76c45cb 100644 --- a/net/webrtc/src/webrtcsrc/imp.rs +++ b/net/webrtc/src/webrtcsrc/imp.rs @@ -406,8 +406,13 @@ impl Session { } let ghostpad = gst::GhostPad::builder(gst::PadDirection::Src) - .proxy_pad_chain_function(glib::clone!(@weak element, @strong self.id as sess_id => @default-panic, move - |pad, parent, buffer| { + .proxy_pad_chain_function(glib::clone!( + #[weak] + element, + #[strong(rename_to = sess_id)] + self.id, + #[upgrade_or_panic] + move |pad, parent, buffer| { let padret = gst::ProxyPad::chain_default(pad, parent, buffer); let state = element.imp().state.lock().unwrap(); let Some(session) = state.sessions.get(&sess_id) else { @@ -418,30 +423,44 @@ impl Session { f } )) - .proxy_pad_event_function(glib::clone!(@weak element , @weak webrtcbin_pad as webrtcpad, @strong self.id as sess_id => @default-panic, move |pad, parent, event| { - let event = if let gst::EventView::StreamStart(stream_start) = event.view() { - let state = element.imp().state.lock().unwrap(); - if let Some(session) = state.sessions.get(&sess_id) { - session.get_src_pad_from_webrtcbin_pad(&webrtcpad, &element) - .map(|srcpad| { - gst::event::StreamStart::builder(&srcpad.imp().stream_id()) - .seqnum(stream_start.seqnum()) - .group_id(stream_start.group_id().unwrap_or_else(gst::GroupId::next)) - .build() - }).unwrap_or(event) + .proxy_pad_event_function(glib::clone!( + #[weak] + element, + #[weak(rename_to = webrtcpad)] + webrtcbin_pad, + #[strong(rename_to = sess_id)] + self.id, + #[upgrade_or_panic] + move |pad, parent, event| { + let event = if let gst::EventView::StreamStart(stream_start) = event.view() { + let state = element.imp().state.lock().unwrap(); + if let Some(session) = state.sessions.get(&sess_id) { + session + .get_src_pad_from_webrtcbin_pad(&webrtcpad, &element) + .map(|srcpad| { + gst::event::StreamStart::builder(&srcpad.imp().stream_id()) + .seqnum(stream_start.seqnum()) + .group_id( + stream_start + .group_id() + .unwrap_or_else(gst::GroupId::next), + ) + .build() + }) + .unwrap_or(event) + } else { + gst::error!(CAT, obj: element , "session {sess_id:?} does not exist"); + event + } } else { - gst::error!(CAT, obj: element , "session {sess_id:?} does not exist"); event - } - } else { - event - }; + }; - gst::Pad::event_default(pad, parent, event) - })) + gst::Pad::event_default(pad, parent, event) + } + )) .build(); - let sess_id = self.id.clone(); if element .imp() .settings @@ -451,23 +470,34 @@ impl Session { { webrtcbin_pad.add_probe( gst::PadProbeType::EVENT_UPSTREAM, - glib::clone!(@weak element => @default-panic, move |_pad, info| { - let Some(ev) = info.event() else { - return gst::PadProbeReturn::Ok; - }; - if ev.type_() != gst::EventType::Navigation { - return gst::PadProbeReturn::Ok; - }; + glib::clone!( + #[weak] + element, + #[strong(rename_to = sess_id)] + self.id, + #[upgrade_or] + gst::PadProbeReturn::Remove, + move |_pad, info| { + let Some(ev) = info.event() else { + return gst::PadProbeReturn::Ok; + }; + if ev.type_() != gst::EventType::Navigation { + return gst::PadProbeReturn::Ok; + }; - let mut state = element.imp().state.lock().unwrap(); - if let Some(session) = state.sessions.get_mut(&sess_id) { - session.send_navigation_event(gst_video::NavigationEvent::parse(ev).unwrap(), &element); - } else { - gst::error!(CAT, obj: element , "session {sess_id:?} does not exist"); + let mut state = element.imp().state.lock().unwrap(); + if let Some(session) = state.sessions.get_mut(&sess_id) { + session.send_navigation_event( + gst_video::NavigationEvent::parse(ev).unwrap(), + &element, + ); + } else { + gst::error!(CAT, obj: element , "session {sess_id:?} does not exist"); + } + + gst::PadProbeReturn::Ok } - - gst::PadProbeReturn::Ok - }), + ), ); } @@ -502,15 +532,17 @@ impl Session { .expect("decodebin3 needs to be present!"); bin.add(&decodebin).unwrap(); decodebin.sync_state_with_parent().unwrap(); - decodebin.connect_pad_added( - glib::clone!(@weak element as this, @weak ghostpad as ghostpad => move |_webrtcbin, pad| { + decodebin.connect_pad_added(glib::clone!( + #[weak] + ghostpad, + move |_webrtcbin, pad| { if pad.direction() == gst::PadDirection::Sink { return; } ghostpad.set_target(Some(pad)).unwrap(); - }), - ); + } + )); gst::debug!(CAT, obj: element, "Decoding for {}", srcpad.imp().stream_id()); @@ -684,20 +716,21 @@ impl Session { gst::info!(CAT, obj: element, "Set remote description"); - let obj = element.clone(); - - let session_id = self.id.clone(); - let promise = - gst::Promise::with_change_func(glib::clone!(@weak element as ele => move |reply| { - let state = ele.imp().state.lock().unwrap(); - gst::info!(CAT, obj: ele, "got answer for session {session_id:?}"); - let Some(session) = state.sessions.get(&session_id) else { - gst::error!(CAT, obj: ele , "no session {session_id:?}"); - return - }; - session.on_answer_created(reply, &obj); - } - )); + let promise = gst::Promise::with_change_func(glib::clone!( + #[weak] + element, + #[strong(rename_to = session_id)] + self.id, + move |reply| { + let state = element.imp().state.lock().unwrap(); + gst::info!(CAT, obj: element, "got answer for session {session_id:?}"); + let Some(session) = state.sessions.get(&session_id) else { + gst::error!(CAT, obj: element , "no session {session_id:?}"); + return; + }; + session.on_answer_created(reply, &element); + } + )); // We cannot emit `create-answer` from here. The promise function // of the answer needs the state lock which is held by the caller @@ -843,7 +876,7 @@ impl BaseWebRTCSrc { error: signaller.connect_closure( "error", false, - glib::closure!(@watch instance => move | + glib::closure!(#[watch] instance, move | _signaller: glib::Object, error: String| { gst::element_error!( instance, @@ -856,7 +889,7 @@ impl BaseWebRTCSrc { session_started: signaller.connect_closure( "session-started", false, - glib::closure!(@watch instance => move | + glib::closure!(#[watch] instance, move | _signaller: glib::Object, session_id: &str, _peer_id: &str| { @@ -869,7 +902,7 @@ impl BaseWebRTCSrc { session_ended: signaller.connect_closure( "session-ended", false, - glib::closure!(@watch instance => move |_signaler: glib::Object, session_id: &str|{ + glib::closure!(#[watch] instance, move |_signaler: glib::Object, session_id: &str|{ let this = instance.imp(); let state = this.state.lock().unwrap(); let Some(session) = state.sessions.get(session_id) else { @@ -900,7 +933,7 @@ impl BaseWebRTCSrc { request_meta: signaller.connect_closure( "request-meta", false, - glib::closure!(@watch instance => move | + glib::closure!(#[watch] instance, move | _signaller: glib::Object| -> Option { instance.imp().settings.lock().unwrap().meta.clone() }), @@ -909,7 +942,7 @@ impl BaseWebRTCSrc { session_description: signaller.connect_closure( "session-description", false, - glib::closure!(@watch instance => move | + glib::closure!(#[watch] instance, move | _signaller: glib::Object, session_id: &str, desc: &gst_webrtc::WebRTCSessionDescription| { @@ -934,7 +967,7 @@ impl BaseWebRTCSrc { handle_ice: signaller.connect_closure( "handle-ice", false, - glib::closure!(@watch instance => move | + glib::closure!(#[watch] instance, move | _signaller: glib::Object, session_id: &str, sdp_m_line_index: u32, @@ -1079,90 +1112,110 @@ impl BaseWebRTCSrc { let bin = gst::Bin::new(); - bin.connect_pad_removed( - glib::clone!(@weak self as this, @to-owned session_id => move |_, pad| + bin.connect_pad_removed(glib::clone!( + #[weak(rename_to = this)] + self, + #[to_owned] + session_id, + move |_, pad| { let mut state = this.state.lock().unwrap(); let Some(session) = state.sessions.get_mut(&session_id) else { gst::warning!(CAT, imp: this, "session {session_id:?} not found"); - return + return; }; session.flow_combiner.lock().unwrap().remove_pad(pad); - ), - ); - bin.connect_pad_added( - glib::clone!(@weak self as this, @to-owned session_id => move |_, pad| + } + )); + bin.connect_pad_added(glib::clone!( + #[weak(rename_to = this)] + self, + #[to_owned] + session_id, + move |_, pad| { let mut state = this.state.lock().unwrap(); let Some(session) = state.sessions.get_mut(&session_id) else { gst::warning!(CAT, imp: this, "session {session_id:?} not found"); - return + return; }; session.flow_combiner.lock().unwrap().add_pad(pad); + } + )); - ), - ); - - webrtcbin.connect_pad_added( - glib::clone!(@weak self as this, @weak bin, @to-owned session_id => move |_webrtcbin, pad| { + webrtcbin.connect_pad_added(glib::clone!( + #[weak(rename_to = this)] + self, + #[weak] + bin, + #[to_owned] + session_id, + move |_webrtcbin, pad| { if pad.direction() == gst::PadDirection::Sink { return; } let mut state = this.state.lock().unwrap(); let Some(session) = state.sessions.get_mut(&session_id) else { gst::error!(CAT, imp: this, "session {session_id:?} not found"); - return + return; }; let bin_ghostpad = session.handle_webrtc_src_pad(&bin, pad, &this.obj()); drop(state); bin.add_pad(&bin_ghostpad) .expect("Adding ghostpad to the bin should always work"); - }), - ); + } + )); - webrtcbin.connect_pad_removed( - glib::clone!(@weak self as this, @weak bin, @to-owned session_id => move |_webrtcbin, pad| { - let mut state = this.state.lock().unwrap(); - let Some(session) = state.sessions.get_mut(&session_id) else { - gst::error!(CAT, imp: this, "session {session_id:?} not found"); - return - }; - session.flow_combiner.lock().unwrap().remove_pad(pad); - }), - ); + webrtcbin.connect_pad_removed(glib::clone!( + #[weak(rename_to = this)] + self, + #[to_owned] + session_id, + move |_webrtcbin, pad| { + let mut state = this.state.lock().unwrap(); + let Some(session) = state.sessions.get_mut(&session_id) else { + gst::error!(CAT, imp: this, "session {session_id:?} not found"); + return; + }; + session.flow_combiner.lock().unwrap().remove_pad(pad); + } + )); webrtcbin.connect_closure( "on-ice-candidate", false, - glib::closure!(@weak-allow-none self as this, @to-owned session_id => move | - _webrtcbin: gst::Bin, - sdp_m_line_index: u32, - candidate: String| { - if let Some(ele) = this { - let mut state = ele.state.lock().unwrap(); + glib::closure!( + #[weak(rename_to = this)] + self, + #[to_owned] + session_id, + move |_webrtcbin: gst::Bin, sdp_m_line_index: u32, candidate: String| { + let mut state = this.state.lock().unwrap(); let Some(session) = state.sessions.get_mut(&session_id) else { - gst::error!(CAT, imp: ele, "session {session_id:?} not found"); - return + gst::error!(CAT, imp: this, "session {session_id:?} not found"); + return; }; - session.on_ice_candidate(sdp_m_line_index, candidate, &ele.obj()); + session.on_ice_candidate(sdp_m_line_index, candidate, &this.obj()); } - }), + ), ); webrtcbin.connect_closure( "on-data-channel", false, - glib::closure!(@weak-allow-none self as this, @to-owned session_id => move | - _webrtcbin: gst::Bin, - data_channel: glib::Object| { - if let Some(ele) = this { - let mut state = ele.state.lock().unwrap(); + glib::closure!( + #[weak(rename_to = this)] + self, + #[to_owned] + session_id, + move |_webrtcbin: gst::Bin, data_channel: glib::Object| { + let mut state = this.state.lock().unwrap(); let Some(session) = state.sessions.get_mut(&session_id) else { - gst::error!(CAT, imp: ele, "session {session_id:?} not found"); - return + gst::error!(CAT, imp: this, "session {session_id:?} not found"); + return; }; - session.on_data_channel(data_channel, &ele.obj()); + session.on_data_channel(data_channel, &this.obj()); } - }), + ), ); bin.add(&webrtcbin).unwrap(); diff --git a/net/webrtc/src/whip_signaller/imp.rs b/net/webrtc/src/whip_signaller/imp.rs index 1ab9b53e..5d10dbe3 100644 --- a/net/webrtc/src/whip_signaller/imp.rs +++ b/net/webrtc/src/whip_signaller/imp.rs @@ -453,32 +453,34 @@ impl SignallableImpl for WhipClient { glib::closure!(|signaller: &super::WhipClientSignaller, _consumer_identifier: &str, webrtcbin: &gst::Element| { - let obj_weak = signaller.downgrade(); - webrtcbin.connect_notify(Some("ice-gathering-state"), move |webrtcbin, _pspec| { - let Some(obj) = obj_weak.upgrade() else { - return; - }; + webrtcbin.connect_notify( + Some("ice-gathering-state"), + glib::clone!( + #[weak] + signaller, + move |webrtcbin, _pspec| { + let state = webrtcbin + .property::("ice-gathering-state"); - let state = - webrtcbin.property::("ice-gathering-state"); + match state { + WebRTCICEGatheringState::Gathering => { + gst::info!(CAT, obj: signaller, "ICE gathering started"); + } + WebRTCICEGatheringState::Complete => { + gst::info!(CAT, obj: signaller, "ICE gathering complete"); - match state { - WebRTCICEGatheringState::Gathering => { - gst::info!(CAT, obj: obj, "ICE gathering started"); - } - WebRTCICEGatheringState::Complete => { - gst::info!(CAT, obj: obj, "ICE gathering complete"); + let webrtcbin = webrtcbin.clone(); - let webrtcbin = webrtcbin.clone(); - - RUNTIME.spawn(async move { + RUNTIME.spawn(async move { /* Note that we check for a valid WHIP endpoint in change_state */ - obj.imp().send_offer(&webrtcbin).await + signaller.imp().send_offer(&webrtcbin).await }); + } + _ => (), + } } - _ => (), - } - }); + ), + ); }), ); @@ -657,50 +659,48 @@ impl WhipServer { glib::closure!(|signaller: &super::WhipServerSignaller, _producer_identifier: &str, webrtcbin: &gst::Element| { - let obj_weak = signaller.downgrade(); - webrtcbin.connect_notify(Some("ice-gathering-state"), move |webrtcbin, _pspec| { - let obj = match obj_weak.upgrade() { - Some(obj) => obj, - None => return, - }; + webrtcbin.connect_notify( + Some("ice-gathering-state"), + glib::clone!( + #[weak] + signaller, + move |webrtcbin, _pspec| { + let state = + webrtcbin.property::("ice-gathering-state"); - let state = webrtcbin.property::("ice-gathering-state"); - - match state { - WebRTCICEGatheringState::Gathering => { - gst::info!(CAT, obj: obj, "ICE gathering started"); - } - WebRTCICEGatheringState::Complete => { - gst::info!(CAT, obj: obj, "ICE gathering complete"); - let ans: Option; - let mut settings = obj.imp().settings.lock().unwrap(); - if let Some(answer_desc) = webrtcbin - .property::>("local-description") - { - ans = Some(answer_desc.sdp().to_owned()); - } else { - ans = None; - } - let tx = settings - .sdp_answer - .take() - .expect("SDP answer Sender needs to be valid"); - - let obj_weak = obj.downgrade(); - RUNTIME.spawn(async move { - let obj = match obj_weak.upgrade() { - Some(obj) => obj, - None => return, - }; - - if let Err(e) = tx.send(ans).await { - gst::error!(CAT, obj: obj, "Failed to send SDP {e}"); + match state { + WebRTCICEGatheringState::Gathering => { + gst::info!(CAT, obj: signaller, "ICE gathering started"); } - }); + WebRTCICEGatheringState::Complete => { + gst::info!(CAT, obj: signaller, "ICE gathering complete"); + let ans: Option; + let mut settings = signaller.imp().settings.lock().unwrap(); + if let Some(answer_desc) = webrtcbin + .property::>( + "local-description", + ) + { + ans = Some(answer_desc.sdp().to_owned()); + } else { + ans = None; + } + let tx = settings + .sdp_answer + .take() + .expect("SDP answer Sender needs to be valid"); + + RUNTIME.spawn(glib::clone!(#[strong] signaller, async move { + if let Err(e) = tx.send(ans).await { + gst::error!(CAT, obj: signaller, "Failed to send SDP {e}"); + } + })); + } + _ => (), + } } - _ => (), - } - }); + ), + ); }) } @@ -940,37 +940,29 @@ impl WhipServer { let prefix = warp::path(ROOT); - let self_weak = self.downgrade(); - // POST /endpoint let post_filter = warp::post() .and(warp::path(ENDPOINT_PATH)) .and(warp::path::end()) .and(warp::header::exact(CONTENT_TYPE.as_str(), CONTENT_SDP)) .and(warp::body::bytes()) - .and_then(move |body| { - let s = self_weak.upgrade(); - async { - let self_ = s.expect("Need to have the ObjectRef"); - self_.post_handler(body).await - } - }); - - let self_weak = self.downgrade(); + .and_then(glib::clone!( + #[weak(rename_to = self_)] + self, + #[upgrade_or_panic] + move |body| async move { self_.post_handler(body).await } + )); // OPTIONS /endpoint let options_filter = warp::options() .and(warp::path(ENDPOINT_PATH)) .and(warp::path::end()) - .and_then(move || { - let s = self_weak.upgrade(); - async { - let self_ = s.expect("Need to have the ObjectRef"); - self_.options_handler().await - } - }); - - let self_weak = self.downgrade(); + .and_then(glib::clone!( + #[weak(rename_to = self_)] + self, + #[upgrade_or_panic] + move || async move { self_.options_handler().await } + )); // PATCH /resource/:id let patch_filter = warp::patch() @@ -981,28 +973,24 @@ impl WhipServer { CONTENT_TYPE.as_str(), CONTENT_TRICKLE_ICE, )) - .and_then(move |id| { - let s = self_weak.upgrade(); - async { - let self_ = s.expect("Need to have the ObjectRef"); - self_.patch_handler(id).await - } - }); - - let self_weak = self.downgrade(); + .and_then(glib::clone!( + #[weak(rename_to = self_)] + self, + #[upgrade_or_panic] + move |id| async move { self_.patch_handler(id).await } + )); // DELETE /resource/:id let delete_filter = warp::delete() .and(warp::path(RESOURCE_PATH)) .and(warp::path::param::()) .and(warp::path::end()) - .and_then(move |id| { - let s = self_weak.upgrade(); - async { - let self_ = s.expect("Need to have the ObjectRef"); - self_.delete_handler(id).await - } - }); + .and_then(glib::clone!( + #[weak(rename_to = self_)] + self, + #[upgrade_or_panic] + move |id| async move { self_.delete_handler(id).await } + )); let api = prefix .and(post_filter)