webrtcsink: use spawn_blocking instead of call_async

In `webrtcsink`, we terminate a session by setting the session's pipeline to
`Null` like this:

```rust
    pipeline.call_async(|pipeline| {
        [...]
        pipeline.set_state(gst::State::Null);
        [...]
        // the following cvar is awaited in unprepare()
        cvar.notify_one();
    });
```

However, `pipeline.call_async` keeps a ref on the pipeline until it's done,
which means the `cvar` is notified before `pipeline` is actually 'disposed',
which happens in a different thread than `unprepare`'s. [`gst_rtp_bin_dispose`]
releases some resources when the pipeline is unrefed. In some cases, those
resources are actually released after the main thread has returned, leading
various issues.

This commit uses tokio runtime's `spawn_blocking` instead, which allows owning
and disposing of the pipeline before the `cvar` is notified.

[`gst_rtp_bin_dispose`]: https://gitlab.freedesktop.org/gstreamer/gstreamer/-/blob/main/subprojects/gst-plugins-good/gst/rtpmanager/gstrtpbin.c#L3108

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1225>
This commit is contained in:
François Laignel 2023-05-26 14:23:03 +02:00
parent a20855dfd9
commit 4cc2498c24

View file

@ -799,13 +799,15 @@ impl State {
let (sessions, _cvar) = &*finalizing_sessions; let (sessions, _cvar) = &*finalizing_sessions;
sessions.lock().unwrap().insert(session_id.clone()); sessions.lock().unwrap().insert(session_id.clone());
session.pipeline.call_async(move |pipeline| { let pipeline = session.pipeline.clone();
RUNTIME.spawn_blocking(move || {
if let Some(stats_collection_handle) = stats_collection_handle { if let Some(stats_collection_handle) = stats_collection_handle {
stats_collection_handle.abort(); stats_collection_handle.abort();
let _ = RUNTIME.block_on(stats_collection_handle); let _ = RUNTIME.block_on(stats_collection_handle);
} }
let _ = pipeline.set_state(gst::State::Null); let _ = pipeline.set_state(gst::State::Null);
drop(pipeline);
let (sessions, cvar) = &*finalizing_sessions; let (sessions, cvar) = &*finalizing_sessions;
let mut sessions = sessions.lock().unwrap(); let mut sessions = sessions.lock().unwrap();