From 47159ad3c224e01e634a66d0afa3baa43066c228 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Fri, 14 Apr 2023 12:46:43 +0300 Subject: [PATCH] Make sure to keep around and drop bus watches after usage in all the examples --- generic/threadshare/examples/benchmark.rs | 39 +++--- .../threadshare/examples/standalone/main.rs | 67 +++++----- .../examples/udpsrc_benchmark_sender.rs | 45 +++---- generic/threadshare/tests/pipeline.rs | 116 +++++++++--------- .../examples/gtk_fallbackswitch.rs | 49 ++++---- utils/livesync/examples/gtk_livesync.rs | 10 +- utils/togglerecord/examples/gtk_recording.rs | 49 ++++---- video/gtk4/examples/gtksink.rs | 48 ++++---- 8 files changed, 217 insertions(+), 206 deletions(-) diff --git a/generic/threadshare/examples/benchmark.rs b/generic/threadshare/examples/benchmark.rs index 9ecfce63..3d8a893c 100644 --- a/generic/threadshare/examples/benchmark.rs +++ b/generic/threadshare/examples/benchmark.rs @@ -192,27 +192,28 @@ fn main() { let bus = pipeline.bus().unwrap(); let l_clone = l.clone(); - bus.add_watch(move |_, msg| { - use gst::MessageView; + let _bus_watch = bus + .add_watch(move |_, msg| { + use gst::MessageView; - match msg.view() { - MessageView::Eos(..) => l_clone.quit(), - MessageView::Error(err) => { - gst::error!( - CAT, - "Error from {:?}: {} ({:?})", - err.src().map(|s| s.path_string()), - err.error(), - err.debug() - ); - l_clone.quit(); - } - _ => (), - }; + match msg.view() { + MessageView::Eos(..) => l_clone.quit(), + MessageView::Error(err) => { + gst::error!( + CAT, + "Error from {:?}: {} ({:?})", + err.src().map(|s| s.path_string()), + err.error(), + err.debug() + ); + l_clone.quit(); + } + _ => (), + }; - glib::Continue(true) - }) - .expect("Failed to add bus watch"); + glib::Continue(true) + }) + .expect("Failed to add bus watch"); pipeline.set_state(gst::State::Playing).unwrap(); diff --git a/generic/threadshare/examples/standalone/main.rs b/generic/threadshare/examples/standalone/main.rs index ff1edc41..54340ef4 100644 --- a/generic/threadshare/examples/standalone/main.rs +++ b/generic/threadshare/examples/standalone/main.rs @@ -108,45 +108,46 @@ fn main() { let terminated_count = Arc::new(AtomicU32::new(0)); let pipeline_clone = pipeline.clone(); let l_clone = l.clone(); - bus.add_watch(move |_, msg| { - use gst::MessageView; - match msg.view() { - MessageView::Eos(_) => { - // Actually, we don't post EOS (see sinks impl). - gst::info!(CAT, "Received eos"); - l_clone.quit(); + let _bus_watch = bus + .add_watch(move |_, msg| { + use gst::MessageView; + match msg.view() { + MessageView::Eos(_) => { + // Actually, we don't post EOS (see sinks impl). + gst::info!(CAT, "Received eos"); + l_clone.quit(); - glib::Continue(false) - } - MessageView::Error(msg) => { - if let gst::MessageView::Error(msg) = msg.message().view() { - if msg.error().matches(gst::LibraryError::Shutdown) { - if terminated_count.fetch_add(1, Ordering::SeqCst) == args.streams - 1 { - gst::info!(CAT, "Received all shutdown requests"); - l_clone.quit(); + glib::Continue(false) + } + MessageView::Error(msg) => { + if let gst::MessageView::Error(msg) = msg.message().view() { + if msg.error().matches(gst::LibraryError::Shutdown) { + if terminated_count.fetch_add(1, Ordering::SeqCst) == args.streams - 1 { + gst::info!(CAT, "Received all shutdown requests"); + l_clone.quit(); - return glib::Continue(false); - } else { - return glib::Continue(true); + return glib::Continue(false); + } else { + return glib::Continue(true); + } } } + + gst::error!( + CAT, + "Error from {:?}: {} ({:?})", + msg.src().map(|s| s.path_string()), + msg.error(), + msg.debug() + ); + l_clone.quit(); + + glib::Continue(false) } - - gst::error!( - CAT, - "Error from {:?}: {} ({:?})", - msg.src().map(|s| s.path_string()), - msg.error(), - msg.debug() - ); - l_clone.quit(); - - glib::Continue(false) + _ => glib::Continue(true), } - _ => glib::Continue(true), - } - }) - .expect("Failed to add bus watch"); + }) + .expect("Failed to add bus watch"); gst::info!(CAT, "Switching to Ready"); let start = Instant::now(); diff --git a/generic/threadshare/examples/udpsrc_benchmark_sender.rs b/generic/threadshare/examples/udpsrc_benchmark_sender.rs index 497225d4..bb7ce23f 100644 --- a/generic/threadshare/examples/udpsrc_benchmark_sender.rs +++ b/generic/threadshare/examples/udpsrc_benchmark_sender.rs @@ -170,31 +170,32 @@ fn run(pipeline: gst::Pipeline) { let bus = pipeline.bus().unwrap(); let l_clone = l.clone(); - bus.add_watch(move |_, msg| { - use gst::MessageView; - match msg.view() { - MessageView::Eos(_) => { - gst::info!(CAT, "Received eos"); - l_clone.quit(); + let _bus_watch = bus + .add_watch(move |_, msg| { + use gst::MessageView; + match msg.view() { + MessageView::Eos(_) => { + gst::info!(CAT, "Received eos"); + l_clone.quit(); - glib::Continue(false) - } - MessageView::Error(msg) => { - gst::error!( - CAT, - "Error from {:?}: {} ({:?})", - msg.src().map(|s| s.path_string()), - msg.error(), - msg.debug() - ); - l_clone.quit(); + glib::Continue(false) + } + MessageView::Error(msg) => { + gst::error!( + CAT, + "Error from {:?}: {} ({:?})", + msg.src().map(|s| s.path_string()), + msg.error(), + msg.debug() + ); + l_clone.quit(); - glib::Continue(false) + glib::Continue(false) + } + _ => glib::Continue(true), } - _ => glib::Continue(true), - } - }) - .expect("Failed to add bus watch"); + }) + .expect("Failed to add bus watch"); pipeline.set_state(gst::State::Playing).unwrap(); l.run(); diff --git a/generic/threadshare/tests/pipeline.rs b/generic/threadshare/tests/pipeline.rs index 91e5882a..d27eef6e 100644 --- a/generic/threadshare/tests/pipeline.rs +++ b/generic/threadshare/tests/pipeline.rs @@ -135,38 +135,39 @@ fn multiple_contexts_queue() { let bus = pipeline.bus().unwrap(); let l_clone = l.clone(); - bus.add_watch(move |_, msg| { - use gst::MessageView; + let _bus_watch = bus + .add_watch(move |_, msg| { + use gst::MessageView; - match msg.view() { - MessageView::StateChanged(state_changed) => { - if let Some(source) = state_changed.src() { - if source.type_() == gst::Pipeline::static_type() - && state_changed.old() == gst::State::Paused - && state_changed.current() == gst::State::Playing - { - if let Some(test_scenario) = test_scenario.take() { - std::thread::spawn(test_scenario); + match msg.view() { + MessageView::StateChanged(state_changed) => { + if let Some(source) = state_changed.src() { + if source.type_() == gst::Pipeline::static_type() + && state_changed.old() == gst::State::Paused + && state_changed.current() == gst::State::Playing + { + if let Some(test_scenario) = test_scenario.take() { + std::thread::spawn(test_scenario); + } } } } - } - MessageView::Error(err) => { - gst::error!( - CAT, - "multiple_contexts_queue: Error from {:?}: {} ({:?})", - err.src().map(|s| s.path_string()), - err.error(), - err.debug() - ); - l_clone.quit(); - } - _ => (), - }; + MessageView::Error(err) => { + gst::error!( + CAT, + "multiple_contexts_queue: Error from {:?}: {} ({:?})", + err.src().map(|s| s.path_string()), + err.error(), + err.debug() + ); + l_clone.quit(); + } + _ => (), + }; - glib::Continue(true) - }) - .unwrap(); + glib::Continue(true) + }) + .unwrap(); pipeline.set_state(gst::State::Playing).unwrap(); @@ -281,38 +282,39 @@ fn multiple_contexts_proxy() { let bus = pipeline.bus().unwrap(); let l_clone = l.clone(); - bus.add_watch(move |_, msg| { - use gst::MessageView; + let _bus_watch = bus + .add_watch(move |_, msg| { + use gst::MessageView; - match msg.view() { - MessageView::StateChanged(state_changed) => { - if let Some(source) = state_changed.src() { - if source.type_() == gst::Pipeline::static_type() - && state_changed.old() == gst::State::Paused - && state_changed.current() == gst::State::Playing - { - if let Some(test_scenario) = test_scenario.take() { - std::thread::spawn(test_scenario); + match msg.view() { + MessageView::StateChanged(state_changed) => { + if let Some(source) = state_changed.src() { + if source.type_() == gst::Pipeline::static_type() + && state_changed.old() == gst::State::Paused + && state_changed.current() == gst::State::Playing + { + if let Some(test_scenario) = test_scenario.take() { + std::thread::spawn(test_scenario); + } } } } - } - MessageView::Error(err) => { - gst::error!( - CAT, - "multiple_contexts_proxy: Error from {:?}: {} ({:?})", - err.src().map(|s| s.path_string()), - err.error(), - err.debug() - ); - l_clone.quit(); - } - _ => (), - }; + MessageView::Error(err) => { + gst::error!( + CAT, + "multiple_contexts_proxy: Error from {:?}: {} ({:?})", + err.src().map(|s| s.path_string()), + err.error(), + err.debug() + ); + l_clone.quit(); + } + _ => (), + }; - glib::Continue(true) - }) - .unwrap(); + glib::Continue(true) + }) + .unwrap(); pipeline.set_state(gst::State::Playing).unwrap(); @@ -405,7 +407,7 @@ fn eos() { }); let l_clone = l.clone(); - pipeline + let _bus_watch = pipeline .bus() .unwrap() .add_watch(move |_, msg| { @@ -561,7 +563,7 @@ fn premature_shutdown() { }); let l_clone = l.clone(); - pipeline + let _bus_watch = pipeline .bus() .unwrap() .add_watch(move |_, msg| { @@ -657,7 +659,7 @@ fn socket_play_null_play() { }); let l_clone = l.clone(); - pipeline + let _bus_watch = pipeline .bus() .unwrap() .add_watch(move |_, msg| { diff --git a/utils/fallbackswitch/examples/gtk_fallbackswitch.rs b/utils/fallbackswitch/examples/gtk_fallbackswitch.rs index 824edd4f..4aaf78dd 100644 --- a/utils/fallbackswitch/examples/gtk_fallbackswitch.rs +++ b/utils/fallbackswitch/examples/gtk_fallbackswitch.rs @@ -154,42 +154,43 @@ fn create_ui(app: >k::Application) { let bus = pipeline.bus().unwrap(); let app_weak = app.downgrade(); - bus.add_watch_local(move |_, msg| { - use gst::MessageView; + let bus_watch = bus + .add_watch_local(move |_, msg| { + use gst::MessageView; - let app = match app_weak.upgrade() { - Some(app) => app, - None => return glib::Continue(false), - }; + let app = match app_weak.upgrade() { + Some(app) => app, + None => return glib::Continue(false), + }; - match msg.view() { - MessageView::Eos(..) => app.quit(), - MessageView::Error(err) => { - println!( - "Error from {:?}: {} ({:?})", - msg.src().map(|s| s.path_string()), - err.error(), - err.debug() - ); - app.quit(); - } - _ => (), - }; + match msg.view() { + MessageView::Eos(..) => app.quit(), + MessageView::Error(err) => { + println!( + "Error from {:?}: {} ({:?})", + msg.src().map(|s| s.path_string()), + err.error(), + err.debug() + ); + app.quit(); + } + _ => (), + }; - glib::Continue(true) - }) - .expect("Failed to add bus watch"); + glib::Continue(true) + }) + .expect("Failed to add bus watch"); pipeline.set_state(gst::State::Playing).unwrap(); // Pipeline reference is owned by the closure below, so will be // destroyed once the app is destroyed let timeout_id = RefCell::new(Some(timeout_id)); + let bus_watch = RefCell::new(Some(bus_watch)); app.connect_shutdown(move |_| { + drop(bus_watch.borrow_mut().take()); pipeline.set_state(gst::State::Null).unwrap(); - bus.remove_watch().unwrap(); - if let Some(timeout_id) = timeout_id.borrow_mut().take() { timeout_id.remove(); } diff --git a/utils/livesync/examples/gtk_livesync.rs b/utils/livesync/examples/gtk_livesync.rs index 5c9e4f9c..19836cd3 100644 --- a/utils/livesync/examples/gtk_livesync.rs +++ b/utils/livesync/examples/gtk_livesync.rs @@ -10,7 +10,7 @@ use gio::prelude::*; use gst::{glib, prelude::*}; use gtk::prelude::*; -use std::cell::Cell; +use std::cell::{Cell, RefCell}; struct DroppingProbe(glib::WeakRef, Option); @@ -106,7 +106,7 @@ fn create_window(app: >k::Application) { } }); - { + let bus_watch = { let bus = pipeline.bus().unwrap(); let window = window.downgrade(); bus.add_watch_local(move |_, msg| { @@ -136,8 +136,8 @@ fn create_window(app: >k::Application) { glib::Continue(true) }) - .unwrap(); - } + .unwrap() + }; { let pipeline = pipeline.clone(); @@ -148,7 +148,9 @@ fn create_window(app: >k::Application) { }); } + let bus_watch = RefCell::new(Some(bus_watch)); window.connect_unrealize(move |_| { + drop(bus_watch.borrow_mut().take()); pipeline .set_state(gst::State::Null) .expect("Failed to stop pipeline"); diff --git a/utils/togglerecord/examples/gtk_recording.rs b/utils/togglerecord/examples/gtk_recording.rs index 7159a072..272a2d37 100644 --- a/utils/togglerecord/examples/gtk_recording.rs +++ b/utils/togglerecord/examples/gtk_recording.rs @@ -284,42 +284,43 @@ fn create_ui(app: >k::Application) { let bus = pipeline.bus().unwrap(); let app_weak = app.downgrade(); - bus.add_watch_local(move |_, msg| { - use gst::MessageView; + let bus_watch = bus + .add_watch_local(move |_, msg| { + use gst::MessageView; - let app = match app_weak.upgrade() { - Some(app) => app, - None => return glib::Continue(false), - }; + let app = match app_weak.upgrade() { + Some(app) => app, + None => return glib::Continue(false), + }; - match msg.view() { - MessageView::Eos(..) => app.quit(), - MessageView::Error(err) => { - println!( - "Error from {:?}: {} ({:?})", - msg.src().map(|s| s.path_string()), - err.error(), - err.debug() - ); - app.quit(); - } - _ => (), - }; + match msg.view() { + MessageView::Eos(..) => app.quit(), + MessageView::Error(err) => { + println!( + "Error from {:?}: {} ({:?})", + msg.src().map(|s| s.path_string()), + err.error(), + err.debug() + ); + app.quit(); + } + _ => (), + }; - glib::Continue(true) - }) - .expect("Failed to add bus watch"); + glib::Continue(true) + }) + .expect("Failed to add bus watch"); pipeline.set_state(gst::State::Playing).unwrap(); // Pipeline reference is owned by the closure below, so will be // destroyed once the app is destroyed let timeout_id = RefCell::new(Some(timeout_id)); + let bus_watch = RefCell::new(Some(bus_watch)); app.connect_shutdown(move |_| { + drop(bus_watch.borrow_mut().take()); pipeline.set_state(gst::State::Null).unwrap(); - bus.remove_watch().unwrap(); - if let Some(timeout_id) = timeout_id.borrow_mut().take() { timeout_id.remove(); } diff --git a/video/gtk4/examples/gtksink.rs b/video/gtk4/examples/gtksink.rs index bdb3da2e..3dddce25 100644 --- a/video/gtk4/examples/gtksink.rs +++ b/video/gtk4/examples/gtksink.rs @@ -95,42 +95,44 @@ fn create_ui(app: >k::Application) { .expect("Unable to set the pipeline to the `Playing` state"); let app_weak = app.downgrade(); - bus.add_watch_local(move |_, msg| { - use gst::MessageView; + let bus_watch = bus + .add_watch_local(move |_, msg| { + use gst::MessageView; - let app = match app_weak.upgrade() { - Some(app) => app, - None => return glib::Continue(false), - }; + let app = match app_weak.upgrade() { + Some(app) => app, + None => return glib::Continue(false), + }; - match msg.view() { - MessageView::Eos(..) => app.quit(), - MessageView::Error(err) => { - println!( - "Error from {:?}: {} ({:?})", - err.src().map(|s| s.path_string()), - err.error(), - err.debug() - ); - app.quit(); - } - _ => (), - }; + match msg.view() { + MessageView::Eos(..) => app.quit(), + MessageView::Error(err) => { + println!( + "Error from {:?}: {} ({:?})", + err.src().map(|s| s.path_string()), + err.error(), + err.debug() + ); + app.quit(); + } + _ => (), + }; - glib::Continue(true) - }) - .expect("Failed to add bus watch"); + glib::Continue(true) + }) + .expect("Failed to add bus watch"); let timeout_id = RefCell::new(Some(timeout_id)); let pipeline = RefCell::new(Some(pipeline)); + let bus_watch = RefCell::new(Some(bus_watch)); app.connect_shutdown(move |_| { window.close(); + drop(bus_watch.borrow_mut().take()); if let Some(pipeline) = pipeline.borrow_mut().take() { pipeline .set_state(gst::State::Null) .expect("Unable to set the pipeline to the `Null` state"); - pipeline.bus().unwrap().remove_watch().unwrap(); } if let Some(timeout_id) = timeout_id.borrow_mut().take() {