diff --git a/examples/src/bin/custom_events.rs b/examples/src/bin/custom_events.rs index 4e0a358b1..847b9b63e 100644 --- a/examples/src/bin/custom_events.rs +++ b/examples/src/bin/custom_events.rs @@ -133,33 +133,34 @@ fn example_main() { // Every message from the bus is passed through this function. Its returnvalue determines // whether the handler wants to be called again. If glib::Continue(false) is returned, the // handler is removed and will never be called again. The mainloop still runs though. - bus.add_watch(move |_, msg| { - use gst::MessageView; + let _bus_watch = bus + .add_watch(move |_, msg| { + use gst::MessageView; - let main_loop = &main_loop_clone; - match msg.view() { - MessageView::Eos(..) => { - println!("received eos"); - // An EndOfStream event was sent to the pipeline, so we tell our main loop - // to stop execution here. - main_loop.quit() - } - MessageView::Error(err) => { - println!( - "Error from {:?}: {} ({:?})", - err.src().map(|s| s.path_string()), - err.error(), - err.debug() - ); - main_loop.quit(); - } - _ => (), - }; + let main_loop = &main_loop_clone; + match msg.view() { + MessageView::Eos(..) => { + println!("received eos"); + // An EndOfStream event was sent to the pipeline, so we tell our main loop + // to stop execution here. + main_loop.quit() + } + MessageView::Error(err) => { + println!( + "Error from {:?}: {} ({:?})", + err.src().map(|s| s.path_string()), + err.error(), + err.debug() + ); + main_loop.quit(); + } + _ => (), + }; - // Tell the mainloop to continue executing this callback. - glib::Continue(true) - }) - .expect("Failed to add bus watch"); + // Tell the mainloop to continue executing this callback. + glib::Continue(true) + }) + .expect("Failed to add bus watch"); // Operate GStreamer's bus, facilitating GLib's mainloop here. // This function call will block until you tell the mainloop to quit @@ -169,11 +170,6 @@ fn example_main() { pipeline .set_state(gst::State::Null) .expect("Unable to set the pipeline to the `Null` state"); - - // Remove the watch function from the bus. - // Again: There can always only be one watch function. - // Thus we don't have to tell him which function to remove. - bus.remove_watch().unwrap(); } fn main() { diff --git a/examples/src/bin/d3d11videosink.rs b/examples/src/bin/d3d11videosink.rs index a2018f185..b953158c5 100644 --- a/examples/src/bin/d3d11videosink.rs +++ b/examples/src/bin/d3d11videosink.rs @@ -329,30 +329,31 @@ fn main() -> Result<()> { let main_loop_clone = main_loop.clone(); let bus = playbin.bus().unwrap(); - bus.add_watch(move |_, msg| { - use gst::MessageView; + let _bus_watch = bus + .add_watch(move |_, msg| { + use gst::MessageView; - let main_loop = &main_loop_clone; - match msg.view() { - MessageView::Eos(..) => { - println!("received eos"); - main_loop.quit() - } - MessageView::Error(err) => { - println!( - "Error from {:?}: {} ({:?})", - err.src().map(|s| s.path_string()), - err.error(), - err.debug() - ); - main_loop.quit(); - } - _ => (), - }; + let main_loop = &main_loop_clone; + match msg.view() { + MessageView::Eos(..) => { + println!("received eos"); + main_loop.quit() + } + MessageView::Error(err) => { + println!( + "Error from {:?}: {} ({:?})", + err.src().map(|s| s.path_string()), + err.error(), + err.debug() + ); + main_loop.quit(); + } + _ => (), + }; - glib::Continue(true) - }) - .unwrap(); + glib::Continue(true) + }) + .unwrap(); playbin.set_state(gst::State::Playing).unwrap(); diff --git a/examples/src/bin/events.rs b/examples/src/bin/events.rs index 62514a00d..689ffda44 100644 --- a/examples/src/bin/events.rs +++ b/examples/src/bin/events.rs @@ -87,33 +87,34 @@ fn example_main() { // Every message from the bus is passed through this function. Its returnvalue determines // whether the handler wants to be called again. If glib::Continue(false) is returned, the // handler is removed and will never be called again. The mainloop still runs though. - bus.add_watch(move |_, msg| { - use gst::MessageView; + let _bus_watch = bus + .add_watch(move |_, msg| { + use gst::MessageView; - let main_loop = &main_loop_clone; - match msg.view() { - MessageView::Eos(..) => { - println!("received eos"); - // An EndOfStream event was sent to the pipeline, so we tell our main loop - // to stop execution here. - main_loop.quit() - } - MessageView::Error(err) => { - println!( - "Error from {:?}: {} ({:?})", - err.src().map(|s| s.path_string()), - err.error(), - err.debug() - ); - main_loop.quit(); - } - _ => (), - }; + let main_loop = &main_loop_clone; + match msg.view() { + MessageView::Eos(..) => { + println!("received eos"); + // An EndOfStream event was sent to the pipeline, so we tell our main loop + // to stop execution here. + main_loop.quit() + } + MessageView::Error(err) => { + println!( + "Error from {:?}: {} ({:?})", + err.src().map(|s| s.path_string()), + err.error(), + err.debug() + ); + main_loop.quit(); + } + _ => (), + }; - // Tell the mainloop to continue executing this callback. - glib::Continue(true) - }) - .expect("Failed to add bus watch"); + // Tell the mainloop to continue executing this callback. + glib::Continue(true) + }) + .expect("Failed to add bus watch"); // Operate GStreamer's bus, facilliating GLib's mainloop here. // This function call will block until you tell the mainloop to quit @@ -123,11 +124,6 @@ fn example_main() { pipeline .set_state(gst::State::Null) .expect("Unable to set the pipeline to the `Null` state"); - - // Remove the watch function from the bus. - // Again: There can always only be one watch function. - // Thus we don't have to tell him which function to remove. - bus.remove_watch().unwrap(); } fn main() { diff --git a/examples/src/bin/gtksink.rs b/examples/src/bin/gtksink.rs index 457135cd4..69f04d222 100644 --- a/examples/src/bin/gtksink.rs +++ b/examples/src/bin/gtksink.rs @@ -103,31 +103,32 @@ fn create_ui(app: >k::Application) { .expect("Unable to set the pipeline to the `Playing` state"); let app_weak = app.downgrade(); - bus.add_watch_local(move |_, msg| { - use gst::MessageView; + let _bus_watch = bus + .add_watch_local(move |_, msg| { + use gst::MessageView; - let app = match app_weak.upgrade() { - Some(app) => app, - None => return glib::Continue(false), - }; + let app = match app_weak.upgrade() { + Some(app) => app, + None => return glib::Continue(false), + }; - match msg.view() { - MessageView::Eos(..) => app.quit(), - MessageView::Error(err) => { - println!( - "Error from {:?}: {} ({:?})", - err.src().map(|s| s.path_string()), - err.error(), - err.debug() - ); - app.quit(); - } - _ => (), - }; + match msg.view() { + MessageView::Eos(..) => app.quit(), + MessageView::Error(err) => { + println!( + "Error from {:?}: {} ({:?})", + err.src().map(|s| s.path_string()), + err.error(), + err.debug() + ); + app.quit(); + } + _ => (), + }; - glib::Continue(true) - }) - .expect("Failed to add bus watch"); + glib::Continue(true) + }) + .expect("Failed to add bus watch"); // Pipeline reference is owned by the closure below, so will be // destroyed once the app is destroyed @@ -149,7 +150,6 @@ fn create_ui(app: >k::Application) { pipeline .set_state(gst::State::Null) .expect("Unable to set the pipeline to the `Null` state"); - pipeline.bus().unwrap().remove_watch().unwrap(); } if let Some(timeout_id) = timeout_id.borrow_mut().take() { diff --git a/examples/src/bin/gtkvideooverlay.rs b/examples/src/bin/gtkvideooverlay.rs index 535899206..a27aa4145 100644 --- a/examples/src/bin/gtkvideooverlay.rs +++ b/examples/src/bin/gtkvideooverlay.rs @@ -204,31 +204,32 @@ fn create_ui(app: >k::Application) { .expect("Unable to set the pipeline to the `Playing` state"); let app_weak = app.downgrade(); - bus.add_watch_local(move |_, msg| { - use gst::MessageView; + let _bus_watch = bus + .add_watch_local(move |_, msg| { + use gst::MessageView; - let app = match app_weak.upgrade() { - Some(app) => app, - None => return glib::Continue(false), - }; + let app = match app_weak.upgrade() { + Some(app) => app, + None => return glib::Continue(false), + }; - match msg.view() { - MessageView::Eos(..) => app.quit(), - MessageView::Error(err) => { - println!( - "Error from {:?}: {} ({:?})", - err.src().map(|s| s.path_string()), - err.error(), - err.debug() - ); - app.quit(); - } - _ => (), - }; + match msg.view() { + MessageView::Eos(..) => app.quit(), + MessageView::Error(err) => { + println!( + "Error from {:?}: {} ({:?})", + err.src().map(|s| s.path_string()), + err.error(), + err.debug() + ); + app.quit(); + } + _ => (), + }; - glib::Continue(true) - }) - .expect("Failed to add bus watch"); + glib::Continue(true) + }) + .expect("Failed to add bus watch"); // Pipeline reference is owned by the closure below, so will be // destroyed once the app is destroyed @@ -250,7 +251,6 @@ fn create_ui(app: >k::Application) { pipeline .set_state(gst::State::Null) .expect("Unable to set the pipeline to the `Null` state"); - pipeline.bus().unwrap().remove_watch().unwrap(); } if let Some(timeout_id) = timeout_id.borrow_mut().take() { diff --git a/examples/src/bin/launch_glib_main.rs b/examples/src/bin/launch_glib_main.rs index 34b302d08..71c20b1c7 100644 --- a/examples/src/bin/launch_glib_main.rs +++ b/examples/src/bin/launch_glib_main.rs @@ -35,38 +35,34 @@ fn example_main() { //bus.add_signal_watch(); //bus.connect_message(None, move |_, msg| { - bus.add_watch(move |_, msg| { - use gst::MessageView; + let _bus_watch = bus + .add_watch(move |_, msg| { + use gst::MessageView; - let main_loop = &main_loop_clone; - match msg.view() { - MessageView::Eos(..) => main_loop.quit(), - MessageView::Error(err) => { - println!( - "Error from {:?}: {} ({:?})", - err.src().map(|s| s.path_string()), - err.error(), - err.debug() - ); - main_loop.quit(); - } - _ => (), - }; + let main_loop = &main_loop_clone; + match msg.view() { + MessageView::Eos(..) => main_loop.quit(), + MessageView::Error(err) => { + println!( + "Error from {:?}: {} ({:?})", + err.src().map(|s| s.path_string()), + err.error(), + err.debug() + ); + main_loop.quit(); + } + _ => (), + }; - glib::Continue(true) - }) - .expect("Failed to add bus watch"); + glib::Continue(true) + }) + .expect("Failed to add bus watch"); main_loop.run(); pipeline .set_state(gst::State::Null) .expect("Unable to set the pipeline to the `Null` state"); - - // Here we remove the bus watch we added above. This avoids a memory leak, that might - // otherwise happen because we moved a strong reference (clone of main_loop) into the - // callback closure above. - bus.remove_watch().unwrap(); } fn main() { diff --git a/examples/src/bin/queries.rs b/examples/src/bin/queries.rs index 7a63bef19..dce7c48d0 100644 --- a/examples/src/bin/queries.rs +++ b/examples/src/bin/queries.rs @@ -94,27 +94,28 @@ fn example_main() { let main_loop_clone = main_loop.clone(); //bus.add_signal_watch(); //bus.connect_message(None, move |_, msg| { - bus.add_watch(move |_, msg| { - use gst::MessageView; + let _bus_watch = bus + .add_watch(move |_, msg| { + use gst::MessageView; - let main_loop = &main_loop_clone; - match msg.view() { - MessageView::Eos(..) => main_loop.quit(), - MessageView::Error(err) => { - println!( - "Error from {:?}: {} ({:?})", - err.src().map(|s| s.path_string()), - err.error(), - err.debug() - ); - main_loop.quit(); - } - _ => (), - }; + let main_loop = &main_loop_clone; + match msg.view() { + MessageView::Eos(..) => main_loop.quit(), + MessageView::Error(err) => { + println!( + "Error from {:?}: {} ({:?})", + err.src().map(|s| s.path_string()), + err.error(), + err.debug() + ); + main_loop.quit(); + } + _ => (), + }; - glib::Continue(true) - }) - .expect("Failed to add bus watch"); + glib::Continue(true) + }) + .expect("Failed to add bus watch"); main_loop.run(); @@ -122,7 +123,6 @@ fn example_main() { .set_state(gst::State::Null) .expect("Unable to set the pipeline to the `Null` state"); - bus.remove_watch().unwrap(); timeout_id.remove(); } diff --git a/gstreamer/Gir.toml b/gstreamer/Gir.toml index a35795252..e004a39f1 100644 --- a/gstreamer/Gir.toml +++ b/gstreamer/Gir.toml @@ -586,6 +586,7 @@ final_type = true [[object.function]] name = "remove_watch" + visibility = "crate" [object.function.return] bool_return_is_error = "Bus has no event source" diff --git a/gstreamer/src/auto/bus.rs b/gstreamer/src/auto/bus.rs index 03f5ac47c..09d8e20fe 100644 --- a/gstreamer/src/auto/bus.rs +++ b/gstreamer/src/auto/bus.rs @@ -92,7 +92,8 @@ impl Bus { } #[doc(alias = "gst_bus_remove_watch")] - pub fn remove_watch(&self) -> Result<(), glib::error::BoolError> { + #[allow(dead_code)] + pub(crate) fn remove_watch(&self) -> Result<(), glib::error::BoolError> { unsafe { glib::result_from_gboolean!( ffi::gst_bus_remove_watch(self.to_glib_none().0), diff --git a/gstreamer/src/bus.rs b/gstreamer/src/bus.rs index 18c3f86c6..8b92c6d56 100644 --- a/gstreamer/src/bus.rs +++ b/gstreamer/src/bus.rs @@ -13,7 +13,7 @@ use futures_util::{stream::FusedStream, StreamExt}; use glib::{ ffi::{gboolean, gpointer}, prelude::*, - source::{Continue, Priority, SourceId}, + source::{Continue, Priority}, translate::*, }; @@ -135,7 +135,7 @@ impl Bus { #[doc(alias = "gst_bus_add_watch")] #[doc(alias = "gst_bus_add_watch_full")] - pub fn add_watch(&self, func: F) -> Result + pub fn add_watch(&self, func: F) -> Result where F: FnMut(&Bus, &Message) -> Continue + Send + 'static, { @@ -151,14 +151,14 @@ impl Bus { if res == 0 { Err(glib::bool_error!("Bus already has a watch")) } else { - Ok(from_glib(res)) + Ok(BusWatchGuard { bus: self.clone() }) } } } #[doc(alias = "gst_bus_add_watch")] #[doc(alias = "gst_bus_add_watch_full")] - pub fn add_watch_local(&self, func: F) -> Result + pub fn add_watch_local(&self, func: F) -> Result where F: FnMut(&Bus, &Message) -> Continue + 'static, { @@ -179,7 +179,7 @@ impl Bus { if res == 0 { Err(glib::bool_error!("Bus already has a watch")) } else { - Ok(from_glib(res)) + Ok(BusWatchGuard { bus: self.clone() }) } } } @@ -378,6 +378,22 @@ impl FusedStream for BusStream { } } +// rustdoc-stripper-ignore-next +/// Manages ownership of the bus watch added to a bus with [`Bus::add_watch`] or [`Bus::add_watch_local`] +/// +/// When dropped the bus watch is removed from the bus. +#[derive(Debug)] +#[must_use = "if unused the bus watch will immediately be removed"] +pub struct BusWatchGuard { + bus: Bus, +} + +impl Drop for BusWatchGuard { + fn drop(&mut self) { + let _ = self.bus.remove_watch(); + } +} + #[cfg(test)] mod tests { use std::sync::{Arc, Mutex}; diff --git a/tutorials/src/bin/basic-tutorial-12.rs b/tutorials/src/bin/basic-tutorial-12.rs index 9893c61b4..d781fd87b 100644 --- a/tutorials/src/bin/basic-tutorial-12.rs +++ b/tutorials/src/bin/basic-tutorial-12.rs @@ -23,62 +23,62 @@ fn tutorial_main() -> Result<(), Error> { let main_loop_clone = main_loop.clone(); let pipeline_weak = pipeline.downgrade(); let bus = pipeline.bus().expect("Pipeline has no bus"); - bus.add_watch(move |_, msg| { - let pipeline = match pipeline_weak.upgrade() { - Some(pipeline) => pipeline, - None => return glib::Continue(true), - }; - let main_loop = &main_loop_clone; - match msg.view() { - gst::MessageView::Error(err) => { - println!( - "Error from {:?}: {} ({:?})", - err.src().map(|s| s.path_string()), - err.error(), - err.debug() - ); - let _ = pipeline.set_state(gst::State::Ready); - main_loop.quit(); - } - gst::MessageView::Eos(..) => { - // end-of-stream - let _ = pipeline.set_state(gst::State::Ready); - main_loop.quit(); - } - gst::MessageView::Buffering(buffering) => { - // If the stream is live, we do not care about buffering - if is_live { - return glib::Continue(true); + let _bus_watch = bus + .add_watch(move |_, msg| { + let pipeline = match pipeline_weak.upgrade() { + Some(pipeline) => pipeline, + None => return glib::Continue(true), + }; + let main_loop = &main_loop_clone; + match msg.view() { + gst::MessageView::Error(err) => { + println!( + "Error from {:?}: {} ({:?})", + err.src().map(|s| s.path_string()), + err.error(), + err.debug() + ); + let _ = pipeline.set_state(gst::State::Ready); + main_loop.quit(); } + gst::MessageView::Eos(..) => { + // end-of-stream + let _ = pipeline.set_state(gst::State::Ready); + main_loop.quit(); + } + gst::MessageView::Buffering(buffering) => { + // If the stream is live, we do not care about buffering + if is_live { + return glib::Continue(true); + } - let percent = buffering.percent(); - print!("Buffering ({percent}%)\r"); - match std::io::stdout().flush() { - Ok(_) => {} - Err(err) => eprintln!("Failed: {err}"), - }; + let percent = buffering.percent(); + print!("Buffering ({percent}%)\r"); + match std::io::stdout().flush() { + Ok(_) => {} + Err(err) => eprintln!("Failed: {err}"), + }; - // Wait until buffering is complete before start/resume playing - if percent < 100 { + // Wait until buffering is complete before start/resume playing + if percent < 100 { + let _ = pipeline.set_state(gst::State::Paused); + } else { + let _ = pipeline.set_state(gst::State::Playing); + } + } + gst::MessageView::ClockLost(_) => { + // Get a new clock let _ = pipeline.set_state(gst::State::Paused); - } else { let _ = pipeline.set_state(gst::State::Playing); } + _ => (), } - gst::MessageView::ClockLost(_) => { - // Get a new clock - let _ = pipeline.set_state(gst::State::Paused); - let _ = pipeline.set_state(gst::State::Playing); - } - _ => (), - } - glib::Continue(true) - }) - .expect("Failed to add bus watch"); + glib::Continue(true) + }) + .expect("Failed to add bus watch"); main_loop.run(); - bus.remove_watch()?; pipeline.set_state(gst::State::Null)?; Ok(()) diff --git a/tutorials/src/bin/playback-tutorial-1.rs b/tutorials/src/bin/playback-tutorial-1.rs index 27f12af4e..702547728 100644 --- a/tutorials/src/bin/playback-tutorial-1.rs +++ b/tutorials/src/bin/playback-tutorial-1.rs @@ -135,7 +135,7 @@ fn tutorial_main() -> Result<(), Error> { let playbin_clone = playbin.clone(); let main_loop_clone = main_loop.clone(); let bus = playbin.bus().unwrap(); - bus.add_watch(move |_bus, message| { + let _bus_watch = bus.add_watch(move |_bus, message| { use gst::MessageView; match message.view() { MessageView::Error(err) => { diff --git a/tutorials/src/bin/playback-tutorial-2.rs b/tutorials/src/bin/playback-tutorial-2.rs index 741252cf4..bd9d4ec58 100644 --- a/tutorials/src/bin/playback-tutorial-2.rs +++ b/tutorials/src/bin/playback-tutorial-2.rs @@ -140,7 +140,7 @@ fn tutorial_main() -> Result<(), Error> { let playbin_clone = playbin.clone(); let main_loop_clone = main_loop.clone(); let bus = playbin.bus().unwrap(); - bus.add_watch(move |_bus, message| { + let _bus_watch = bus.add_watch(move |_bus, message| { use gst::MessageView; match message.view() { MessageView::Error(err) => { diff --git a/tutorials/src/bin/playback-tutorial-4.rs b/tutorials/src/bin/playback-tutorial-4.rs index 39282e8b5..39d73fd07 100644 --- a/tutorials/src/bin/playback-tutorial-4.rs +++ b/tutorials/src/bin/playback-tutorial-4.rs @@ -52,54 +52,55 @@ fn tutorial_main() -> Result<(), Error> { let main_loop_clone = main_loop.clone(); let pipeline_weak = pipeline.downgrade(); let bus = pipeline.bus().unwrap(); - bus.add_watch(move |_, msg| { - use gst::MessageView; + let _bus_watch = bus + .add_watch(move |_, msg| { + use gst::MessageView; - let buffering_level = &buffering_level_clone; - let pipeline = match pipeline_weak.upgrade() { - Some(pipeline) => pipeline, - None => return glib::Continue(false), - }; - let main_loop = &main_loop_clone; - match msg.view() { - MessageView::Error(err) => { - println!( - "Error from {:?}: {} ({:?})", - err.src().map(|s| s.path_string()), - err.error(), - err.debug() - ); - main_loop.quit(); - } - MessageView::Eos(..) => { - main_loop.quit(); - } - MessageView::Buffering(buffering) => { - // If the stream is live, we do not care about buffering. - if is_live { - return glib::Continue(true); + let buffering_level = &buffering_level_clone; + let pipeline = match pipeline_weak.upgrade() { + Some(pipeline) => pipeline, + None => return glib::Continue(false), + }; + let main_loop = &main_loop_clone; + match msg.view() { + MessageView::Error(err) => { + println!( + "Error from {:?}: {} ({:?})", + err.src().map(|s| s.path_string()), + err.error(), + err.debug() + ); + main_loop.quit(); } + MessageView::Eos(..) => { + main_loop.quit(); + } + MessageView::Buffering(buffering) => { + // If the stream is live, we do not care about buffering. + if is_live { + return glib::Continue(true); + } - // Wait until buffering is complete before start/resume playing. - let percent = buffering.percent(); - if percent < 100 { + // Wait until buffering is complete before start/resume playing. + let percent = buffering.percent(); + if percent < 100 { + let _ = pipeline.set_state(gst::State::Paused); + } else { + let _ = pipeline.set_state(gst::State::Playing); + } + *buffering_level.lock().unwrap() = percent; + } + MessageView::ClockLost(_) => { + // Get a new clock. let _ = pipeline.set_state(gst::State::Paused); - } else { let _ = pipeline.set_state(gst::State::Playing); } - *buffering_level.lock().unwrap() = percent; - } - MessageView::ClockLost(_) => { - // Get a new clock. - let _ = pipeline.set_state(gst::State::Paused); - let _ = pipeline.set_state(gst::State::Playing); - } - _ => (), - }; + _ => (), + }; - glib::Continue(true) - }) - .expect("Failed to add bus watch"); + glib::Continue(true) + }) + .expect("Failed to add bus watch"); pipeline.connect("deep-notify::temp-location", false, |args| { let download_buffer = args[1].get::().unwrap(); @@ -177,7 +178,6 @@ fn tutorial_main() -> Result<(), Error> { // Shutdown pipeline pipeline.set_state(gst::State::Null)?; - bus.remove_watch()?; timeout_id.remove(); Ok(()) diff --git a/tutorials/src/bin/playback-tutorial-5.rs b/tutorials/src/bin/playback-tutorial-5.rs index b1cfc57d8..8588ccff0 100644 --- a/tutorials/src/bin/playback-tutorial-5.rs +++ b/tutorials/src/bin/playback-tutorial-5.rs @@ -145,7 +145,7 @@ fn tutorial_main() -> Result<(), Error> { let main_loop_clone = main_loop.clone(); let bus = pipeline.bus().unwrap(); let pipeline_weak = pipeline.downgrade(); - bus.add_watch(move |_bus, message| { + let _bus_watch = bus.add_watch(move |_bus, message| { use gst::MessageView; let pipeline = match pipeline_weak.upgrade() {