From c0422acf661064b0e36539a858eeb8815bb678da Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Fri, 27 Jul 2018 13:07:24 +0300 Subject: [PATCH] Fix reference cycles and minor related problems in all examples and tutorials These are now all leak-free. --- examples/src/bin/appsink.rs | 2 +- examples/src/bin/appsrc.rs | 2 +- examples/src/bin/decodebin.rs | 8 ++- examples/src/bin/events.rs | 12 ++-- examples/src/bin/gtksink.rs | 40 +++++++++---- examples/src/bin/gtkvideooverlay.rs | 47 ++++++++++----- examples/src/bin/launch_glib_main.rs | 2 + examples/src/bin/pad_probes.rs | 8 +-- examples/src/bin/player.rs | 8 +-- examples/src/bin/queries.rs | 12 +++- examples/src/bin/rtpfecclient.rs | 12 +++- examples/src/bin/rtpfecserver.rs | 3 + examples/src/bin/rtsp-server-record.rs | 4 +- examples/src/bin/rtsp-server.rs | 4 +- examples/src/bin/toc.rs | 7 ++- tutorials/src/bin/basic-tutorial-3.rs | 15 +++-- tutorials/src/bin/basic-tutorial-5.rs | 81 ++++++++++++++++++++------ tutorials/src/bin/basic-tutorial-8.rs | 43 +++++++++----- 18 files changed, 222 insertions(+), 88 deletions(-) diff --git a/examples/src/bin/appsink.rs b/examples/src/bin/appsink.rs index bd03e396a..11859cb09 100644 --- a/examples/src/bin/appsink.rs +++ b/examples/src/bin/appsink.rs @@ -47,7 +47,7 @@ fn create_pipeline() -> Result { pipeline.add_many(&[&src, &sink])?; src.link(&sink)?; - let appsink = sink.clone() + let appsink = sink .dynamic_cast::() .expect("Sink element is expected to be an appsink!"); diff --git a/examples/src/bin/appsrc.rs b/examples/src/bin/appsrc.rs index ca6ff3e7b..f4b950a01 100644 --- a/examples/src/bin/appsrc.rs +++ b/examples/src/bin/appsrc.rs @@ -46,7 +46,7 @@ fn create_pipeline() -> Result { pipeline.add_many(&[&src, &videoconvert, &sink])?; gst::Element::link_many(&[&src, &videoconvert, &sink])?; - let appsrc = src.clone() + let appsrc = src .dynamic_cast::() .expect("Source element is expected to be an appsrc!"); diff --git a/examples/src/bin/decodebin.rs b/examples/src/bin/decodebin.rs index 6a172b369..0a6d5021a 100644 --- a/examples/src/bin/decodebin.rs +++ b/examples/src/bin/decodebin.rs @@ -53,10 +53,12 @@ fn example_main() -> Result<(), Error> { pipeline.add_many(&[&src, &decodebin])?; gst::Element::link_many(&[&src, &decodebin])?; - // Need to move a new reference into the closure - let pipeline_clone = pipeline.clone(); + let pipeline_weak = pipeline.downgrade(); decodebin.connect_pad_added(move |dbin, src_pad| { - let pipeline = &pipeline_clone; + let pipeline = match pipeline_weak.upgrade() { + Some(pipeline) => pipeline, + None => return, + }; let (is_audio, is_video) = { let media_type = src_pad.get_current_caps().and_then(|caps| { diff --git a/examples/src/bin/events.rs b/examples/src/bin/events.rs index 58244e187..ad364f9a8 100644 --- a/examples/src/bin/events.rs +++ b/examples/src/bin/events.rs @@ -17,11 +17,12 @@ fn example_main() { let ret = pipeline.set_state(gst::State::Playing); assert_ne!(ret, gst::StateChangeReturn::Failure); - let main_loop_clone = main_loop.clone(); - - let pipeline_clone = pipeline.clone(); + let pipeline_weak = pipeline.downgrade(); glib::timeout_add_seconds(5, move || { - let pipeline = &pipeline_clone; + let pipeline = match pipeline_weak.upgrade() { + Some(pipeline) => pipeline, + None => return glib::Continue(false), + }; println!("sending eos"); @@ -33,6 +34,7 @@ fn example_main() { //bus.add_signal_watch(); //bus.connect_message(move |_, msg| { + let main_loop_clone = main_loop.clone(); bus.add_watch(move |_, msg| { use gst::MessageView; @@ -61,6 +63,8 @@ fn example_main() { let ret = pipeline.set_state(gst::State::Null); assert_ne!(ret, gst::StateChangeReturn::Failure); + + bus.remove_watch(); } fn main() { diff --git a/examples/src/bin/gtksink.rs b/examples/src/bin/gtksink.rs index e23a95034..31aa0939d 100644 --- a/examples/src/bin/gtksink.rs +++ b/examples/src/bin/gtksink.rs @@ -10,9 +10,7 @@ extern crate gtk; use gtk::prelude::*; use std::env; - -extern crate send_cell; -use send_cell::SendCell; +use std::cell::RefCell; fn create_ui(app: >k::Application) { let pipeline = gst::Pipeline::new(None); @@ -45,9 +43,13 @@ fn create_ui(app: >k::Application) { app.add_window(&window); - let pipeline_clone = pipeline.clone(); - gtk::timeout_add(500, move || { - let pipeline = &pipeline_clone; + let pipeline_weak = pipeline.downgrade(); + let timeout_id = gtk::timeout_add(500, move || { + let pipeline = match pipeline_weak.upgrade() { + Some(pipeline) => pipeline, + None => return glib::Continue(true), + }; + let position = pipeline .query_position::() .unwrap_or_else(|| 0.into()); @@ -56,9 +58,13 @@ fn create_ui(app: >k::Application) { glib::Continue(true) }); - let app_clone = app.clone(); + let app_weak = app.downgrade(); window.connect_delete_event(move |_, _| { - let app = &app_clone; + let app = match app_weak.upgrade() { + Some(app) => app, + None => return Inhibit(false), + }; + app.quit(); Inhibit(false) }); @@ -68,11 +74,15 @@ fn create_ui(app: >k::Application) { let ret = pipeline.set_state(gst::State::Playing); assert_ne!(ret, gst::StateChangeReturn::Failure); - let app_clone = SendCell::new(app.clone()); + let app_weak = glib::SendWeakRef::from(app.downgrade()); bus.add_watch(move |_, msg| { use gst::MessageView; - let app = app_clone.borrow(); + let app = match app_weak.upgrade() { + Some(app) => app, + None => return glib::Continue(false), + }; + match msg.view() { MessageView::Eos(..) => gtk::main_quit(), MessageView::Error(err) => { @@ -90,11 +100,17 @@ fn create_ui(app: >k::Application) { glib::Continue(true) }); - let pipeline_clone = pipeline.clone(); + // Pipeline reference is owned by the closure below, so will be + // destroyed once the app is destroyed + let timeout_id = RefCell::new(Some(timeout_id)); app.connect_shutdown(move |_| { - let pipeline = &pipeline_clone; let ret = pipeline.set_state(gst::State::Null); assert_ne!(ret, gst::StateChangeReturn::Failure); + + bus.remove_watch(); + if let Some(timeout_id) = timeout_id.borrow_mut().take() { + glib::source_remove(timeout_id); + } }); } diff --git a/examples/src/bin/gtkvideooverlay.rs b/examples/src/bin/gtkvideooverlay.rs index 2b43e76cc..928fd465c 100644 --- a/examples/src/bin/gtkvideooverlay.rs +++ b/examples/src/bin/gtkvideooverlay.rs @@ -20,8 +20,7 @@ use std::env; use std::os::raw::c_void; -extern crate send_cell; -use send_cell::SendCell; +use std::cell::RefCell; use std::process; @@ -49,9 +48,13 @@ fn create_ui(app: >k::Application) { video_window.set_size_request(320, 240); let video_overlay = sink.clone() .dynamic_cast::() - .unwrap(); + .unwrap() + .downgrade(); video_window.connect_realize(move |video_window| { - let video_overlay = &video_overlay; + let video_overlay = match video_overlay.upgrade() { + Some(video_overlay) => video_overlay, + None => return, + }; let gdk_window = video_window.get_window().unwrap(); @@ -108,9 +111,13 @@ fn create_ui(app: >k::Application) { app.add_window(&window); - let pipeline_clone = pipeline.clone(); - gtk::timeout_add(500, move || { - let pipeline = &pipeline_clone; + let pipeline_weak = pipeline.downgrade(); + let timeout_id = gtk::timeout_add(500, move || { + let pipeline = match pipeline_weak.upgrade() { + Some(pipeline) => pipeline, + None => return glib::Continue(true), + }; + let position = pipeline .query_position::() .unwrap_or_else(|| 0.into()); @@ -119,9 +126,13 @@ fn create_ui(app: >k::Application) { glib::Continue(true) }); - let app_clone = app.clone(); + let app_weak = app.downgrade(); window.connect_delete_event(move |_, _| { - let app = &app_clone; + let app = match app_weak.upgrade() { + Some(app) => app, + None => return Inhibit(false), + }; + app.quit(); Inhibit(false) }); @@ -131,11 +142,15 @@ fn create_ui(app: >k::Application) { let ret = pipeline.set_state(gst::State::Playing); assert_ne!(ret, gst::StateChangeReturn::Failure); - let app_clone = SendCell::new(app.clone()); + let app_weak = glib::SendWeakRef::from(app.downgrade()); bus.add_watch(move |_, msg| { use gst::MessageView; - let app = app_clone.borrow(); + let app = match app_weak.upgrade() { + Some(app) => app, + None => return glib::Continue(false), + }; + match msg.view() { MessageView::Eos(..) => gtk::main_quit(), MessageView::Error(err) => { @@ -153,11 +168,17 @@ fn create_ui(app: >k::Application) { glib::Continue(true) }); - let pipeline_clone = pipeline.clone(); + // Pipeline reference is owned by the closure below, so will be + // destroyed once the app is destroyed + let timeout_id = RefCell::new(Some(timeout_id)); app.connect_shutdown(move |_| { - let pipeline = &pipeline_clone; let ret = pipeline.set_state(gst::State::Null); assert_ne!(ret, gst::StateChangeReturn::Failure); + + bus.remove_watch(); + if let Some(timeout_id) = timeout_id.borrow_mut().take() { + glib::source_remove(timeout_id); + } }); } diff --git a/examples/src/bin/launch_glib_main.rs b/examples/src/bin/launch_glib_main.rs index a82b8ae6c..02293864d 100644 --- a/examples/src/bin/launch_glib_main.rs +++ b/examples/src/bin/launch_glib_main.rs @@ -50,6 +50,8 @@ fn example_main() { let ret = pipeline.set_state(gst::State::Null); assert_ne!(ret, gst::StateChangeReturn::Failure); + + bus.remove_watch(); } fn main() { diff --git a/examples/src/bin/pad_probes.rs b/examples/src/bin/pad_probes.rs index a6a05db17..8d7c1a8fb 100644 --- a/examples/src/bin/pad_probes.rs +++ b/examples/src/bin/pad_probes.rs @@ -17,12 +17,11 @@ fn example_main() { "audiotestsrc name=src ! audio/x-raw,format={},channels=1 ! fakesink", gst_audio::AUDIO_FORMAT_S16.to_string() )).unwrap(); - let bus = pipeline.get_bus().unwrap(); + let pipeline = pipeline + .dynamic_cast::() + .unwrap(); let src = pipeline - .clone() - .dynamic_cast::() - .unwrap() .get_by_name("src") .unwrap(); let src_pad = src.get_static_pad("src").unwrap(); @@ -53,6 +52,7 @@ fn example_main() { let ret = pipeline.set_state(gst::State::Playing); assert_ne!(ret, gst::StateChangeReturn::Failure); + let bus = pipeline.get_bus().unwrap(); while let Some(msg) = bus.timed_pop(gst::CLOCK_TIME_NONE) { use gst::MessageView; diff --git a/examples/src/bin/player.rs b/examples/src/bin/player.rs index 5388656f6..f60f3fd04 100644 --- a/examples/src/bin/player.rs +++ b/examples/src/bin/player.rs @@ -32,21 +32,17 @@ fn main_loop(uri: &str) -> Result<(), Error> { let error = Arc::new(Mutex::new(Ok(()))); - let player_clone = player.clone(); let main_loop_clone = main_loop.clone(); - player.connect_end_of_stream(move |_| { + player.connect_end_of_stream(move |player| { let main_loop = &main_loop_clone; - let player = &player_clone; player.stop(); main_loop.quit(); }); - let player_clone = player.clone(); let main_loop_clone = main_loop.clone(); let error_clone = Arc::clone(&error); - player.connect_error(move |_, err| { + player.connect_error(move |player, err| { let main_loop = &main_loop_clone; - let player = &player_clone; let error = &error_clone; *error.lock().unwrap() = Err(err.clone()); diff --git a/examples/src/bin/queries.rs b/examples/src/bin/queries.rs index d49e51e72..45a0fd96f 100644 --- a/examples/src/bin/queries.rs +++ b/examples/src/bin/queries.rs @@ -23,9 +23,12 @@ fn example_main() { let main_loop_clone = main_loop.clone(); - let pipeline_clone = pipeline.clone(); - glib::timeout_add_seconds(1, move || { - let pipeline = &pipeline_clone; + let pipeline_weak = pipeline.downgrade(); + let timeout_id = glib::timeout_add_seconds(1, move || { + let pipeline = match pipeline_weak.upgrade() { + Some(pipeline) => pipeline, + None => return glib::Continue(true), + }; //let pos = pipeline.query_position(gst::Format::Time).unwrap_or(-1); //let dur = pipeline.query_duration(gst::Format::Time).unwrap_or(-1); @@ -81,6 +84,9 @@ fn example_main() { let ret = pipeline.set_state(gst::State::Null); assert_ne!(ret, gst::StateChangeReturn::Failure); + + bus.remove_watch(); + glib::source_remove(timeout_id); } fn main() { diff --git a/examples/src/bin/rtpfecclient.rs b/examples/src/bin/rtpfecclient.rs index aabd961b7..1d66c040d 100644 --- a/examples/src/bin/rtpfecclient.rs +++ b/examples/src/bin/rtpfecclient.rs @@ -205,9 +205,14 @@ fn example_main() -> Result<(), Error> { let sinkpad = get_request_pad(&rtpbin, "recv_rtp_sink_0")?; srcpad.link(&sinkpad).into_result()?; - let depay_clone = depay.clone(); + let depay_weak = depay.downgrade(); rtpbin.connect_pad_added(move |rtpbin, src_pad| { - match connect_rtpbin_srcpad(&src_pad, &depay_clone) { + let depay = match depay_weak.upgrade() { + Some(depay) => depay, + None => return, + }; + + match connect_rtpbin_srcpad(&src_pad, &depay) { Ok(_) => (), Err(err) => { gst_element_error!( @@ -245,6 +250,9 @@ fn example_main() -> Result<(), Error> { match msg.view() { MessageView::Eos(..) => break, MessageView::Error(err) => { + let ret = pipeline.set_state(gst::State::Null); + assert_ne!(ret, gst::StateChangeReturn::Failure); + return Err(ErrorMessage { src: msg.get_src() .map(|s| s.get_path_string()) diff --git a/examples/src/bin/rtpfecserver.rs b/examples/src/bin/rtpfecserver.rs index 61cf3664a..8872b093a 100644 --- a/examples/src/bin/rtpfecserver.rs +++ b/examples/src/bin/rtpfecserver.rs @@ -183,6 +183,9 @@ fn example_main() -> Result<(), Error> { match msg.view() { MessageView::Eos(..) => break, MessageView::Error(err) => { + let ret = pipeline.set_state(gst::State::Null); + assert_ne!(ret, gst::StateChangeReturn::Failure); + return Err(ErrorMessage { src: msg.get_src() .map(|s| s.get_path_string()) diff --git a/examples/src/bin/rtsp-server-record.rs b/examples/src/bin/rtsp-server-record.rs index 70973c08b..ea0dbd27d 100644 --- a/examples/src/bin/rtsp-server-record.rs +++ b/examples/src/bin/rtsp-server-record.rs @@ -94,7 +94,7 @@ fn main_loop() -> Result<(), Error> { mounts.add_factory("/test", &factory); - server.attach(None); + let id = server.attach(None); println!( "Stream ready at rtsps://127.0.0.1:{}/test", @@ -103,6 +103,8 @@ fn main_loop() -> Result<(), Error> { main_loop.run(); + glib::source_remove(id); + Ok(()) } diff --git a/examples/src/bin/rtsp-server.rs b/examples/src/bin/rtsp-server.rs index fb3079a7e..1d0bcb466 100644 --- a/examples/src/bin/rtsp-server.rs +++ b/examples/src/bin/rtsp-server.rs @@ -41,7 +41,7 @@ fn main_loop() -> Result<(), Error> { mounts.add_factory("/test", &factory); - server.attach(None); + let id = server.attach(None); println!( "Stream ready at rtsp://127.0.0.1:{}/test", @@ -50,6 +50,8 @@ fn main_loop() -> Result<(), Error> { main_loop.run(); + glib::source_remove(id); + Ok(()) } diff --git a/examples/src/bin/toc.rs b/examples/src/bin/toc.rs index 20021e4b2..e2e527530 100644 --- a/examples/src/bin/toc.rs +++ b/examples/src/bin/toc.rs @@ -30,9 +30,12 @@ fn example_main() { gst::Element::link_many(&[&src, &decodebin]).unwrap(); // Need to move a new reference into the closure - let pipeline_clone = pipeline.clone(); + let pipeline_weak = pipeline.downgrade(); decodebin.connect_pad_added(move |_, src_pad| { - let pipeline = &pipeline_clone; + let pipeline = match pipeline_weak.upgrade() { + Some(pipeline) => pipeline, + None => return, + }; let queue = gst::ElementFactory::make("queue", None).unwrap(); let sink = gst::ElementFactory::make("fakesink", None).unwrap(); diff --git a/tutorials/src/bin/basic-tutorial-3.rs b/tutorials/src/bin/basic-tutorial-3.rs index 84160690e..3e99f2f16 100644 --- a/tutorials/src/bin/basic-tutorial-3.rs +++ b/tutorials/src/bin/basic-tutorial-3.rs @@ -32,11 +32,18 @@ fn tutorial_main() { .expect("Can't set uri property on uridecodebin"); // Connect the pad-added signal - let pipeline_clone = pipeline.clone(); - let convert_clone = convert.clone(); + let pipeline_weak = pipeline.downgrade(); + let convert_weak = convert.downgrade(); source.connect_pad_added(move |_, src_pad| { - let pipeline = &pipeline_clone; - let convert = &convert_clone; + let pipeline = match pipeline_weak.upgrade() { + Some(pipeline) => pipeline, + None => return, + }; + + let convert = match convert_weak.upgrade() { + Some(convert) => convert, + None => return, + }; println!( "Received new pad {} from {}", diff --git a/tutorials/src/bin/basic-tutorial-5.rs b/tutorials/src/bin/basic-tutorial-5.rs index 27e8cc1f7..5e93ef3e6 100644 --- a/tutorials/src/bin/basic-tutorial-5.rs +++ b/tutorials/src/bin/basic-tutorial-5.rs @@ -20,6 +20,33 @@ mod tutorial5 { extern crate gstreamer_video as gst_video; use self::gst_video::prelude::*; + use std::ops; + + // Custom struct to keep our window reference alive + // and to store the timeout id so that we can remove + // it from the main context again later and drop the + // references it keeps inside its closures + struct AppWindow { + main_window: Window, + timeout_id: Option, + } + + impl ops::Deref for AppWindow { + type Target = Window; + + fn deref(&self) -> &Window { + &self.main_window + } + } + + impl Drop for AppWindow { + fn drop(&mut self) { + if let Some(source_id) = self.timeout_id.take() { + glib::source_remove(source_id); + } + } + } + // Extract tags from streams of @stype and add the info in the UI. fn add_streams_info( playbin: &gst::Element, @@ -79,7 +106,7 @@ mod tutorial5 { } // This creates all the GTK+ widgets that compose our application, and registers the callbacks - pub fn create_ui(playbin: &gst::Element) { + fn create_ui(playbin: &gst::Element) -> AppWindow { let main_window = Window::new(WindowType::Toplevel); main_window.connect_delete_event(|_, _| { gtk::main_quit(); @@ -144,7 +171,7 @@ mod tutorial5 { let pipeline = playbin.clone(); let lslider = slider.clone(); // Update the UI (seekbar) every second - gtk::timeout_add_seconds(1, move || { + let timeout_id = gtk::timeout_add_seconds(1, move || { let pipeline = &pipeline; let lslider = &lslider; @@ -228,7 +255,7 @@ mod tutorial5 { let streams_list = gtk::TextView::new(); streams_list.set_editable(false); - let pipeline = playbin.clone(); + let pipeline_weak = playbin.downgrade(); let textbuf = SendCell::new( streams_list .get_buffer() @@ -239,6 +266,11 @@ mod tutorial5 { .unwrap() .connect_message(move |_, msg| match msg.view() { gst::MessageView::Application(application) => { + let pipeline = match pipeline_weak.upgrade() { + Some(pipeline) => pipeline, + None => return, + }; + if application.get_structure().map(|s| s.get_name()) == Some("tags-changed") { analyze_streams(&pipeline, &textbuf); } @@ -257,6 +289,11 @@ mod tutorial5 { main_window.set_default_size(640, 480); main_window.show_all(); + + AppWindow { + main_window, + timeout_id: Some(timeout_id), + } } // We are possibly in a GStreamer working thread, so we notify the main @@ -290,43 +327,45 @@ mod tutorial5 { data/media/sintel_trailer-480p.webm"; let playbin = gst::ElementFactory::make("playbin", None).unwrap(); playbin - .set_property("uri", &glib::Value::from(uri)) + .set_property("uri", &uri) .unwrap(); - let pipeline = playbin.clone(); playbin - .connect("video-tags-changed", false, move |_| { + .connect("video-tags-changed", false, |args| { + let pipeline = args[0].get::().unwrap(); post_app_message(&pipeline); None }) .unwrap(); - let pipeline = playbin.clone(); playbin - .connect("audio-tags-changed", false, move |_| { + .connect("audio-tags-changed", false, |args| { + let pipeline = args[0].get::().unwrap(); post_app_message(&pipeline); None }) .unwrap(); - let pipeline = playbin.clone(); playbin - .connect("text-tags-changed", false, move |_| { + .connect("text-tags-changed", false, move |args| { + let pipeline = args[0].get::().unwrap(); post_app_message(&pipeline); None }) .unwrap(); - create_ui(&playbin); - playbin - .set_state(gst::State::Playing) - .into_result() - .unwrap(); + let window = create_ui(&playbin); let bus = playbin.get_bus().unwrap(); - let pipeline = playbin.clone(); bus.add_signal_watch(); - playbin.get_bus().unwrap().connect_message(move |_, msg| { + + let pipeline_weak = playbin.downgrade(); + bus.connect_message(move |_, msg| { + let pipeline = match pipeline_weak.upgrade() { + Some(pipeline) => pipeline, + None => return, + }; + match msg.view() { // This is called when an End-Of-Stream message is posted on the bus. // We just set the pipeline to READY (which stops playback). @@ -359,8 +398,16 @@ mod tutorial5 { } }); + playbin + .set_state(gst::State::Playing) + .into_result() + .unwrap(); + gtk::main(); + window.hide(); playbin.set_state(gst::State::Null).into_result().unwrap(); + + bus.remove_signal_watch(); } } diff --git a/tutorials/src/bin/basic-tutorial-8.rs b/tutorials/src/bin/basic-tutorial-8.rs index 5478ff300..d248766f0 100644 --- a/tutorials/src/bin/basic-tutorial-8.rs +++ b/tutorials/src/bin/basic-tutorial-8.rs @@ -127,30 +127,34 @@ fn main() { let audio_caps = info.to_caps().unwrap(); let appsrc = appsrc - .clone() .dynamic_cast::() .expect("Source element is expected to be an appsrc!"); appsrc.set_caps(&audio_caps); appsrc.set_property_format(gst::Format::Time); let appsink = appsink - .clone() .dynamic_cast::() .expect("Sink element is expected to be an appsink!"); let data: Arc> = Arc::new(Mutex::new(CustomData::new(&appsrc, &appsink))); - let data_clone = Arc::clone(&data); + let data_weak = Arc::downgrade(&data); appsrc.connect_need_data(move |_, _size| { - let data = &data_clone; + let data = match data_weak.upgrade() { + Some(data) => data, + None => return, + }; let mut d = data.lock().unwrap(); if d.source_id.is_none() { println!("start feeding"); - let data_clone = Arc::clone(data); + let data_weak = Arc::downgrade(&data); d.source_id = Some(glib::source::idle_add(move || { - let data = &data_clone; + let data = match data_weak.upgrade() { + Some(data) => data, + None => return glib::Continue(false), + }; let (appsrc, buffer) = { let mut data = data.lock().unwrap(); @@ -198,9 +202,12 @@ fn main() { } }); - let data_clone = Arc::clone(&data); + let data_weak = Arc::downgrade(&data); appsrc.connect_enough_data(move |_| { - let data = &data_clone; + let data = match data_weak.upgrade() { + Some(data) => data, + None => return, + }; let mut data = data.lock().unwrap(); if let Some(source) = data.source_id.take() { @@ -213,10 +220,15 @@ fn main() { appsink.set_emit_signals(true); appsink.set_caps(&audio_caps); - let data_clone = Arc::clone(&data); + let data_weak = Arc::downgrade(&data); appsink.connect_new_sample(move |_| { + let data = match data_weak.upgrade() { + Some(data) => data, + None => return gst::FlowReturn::Ok, + }; + let appsink = { - let data = &data_clone.lock().unwrap(); + let data = data.lock().unwrap(); data.appsink.clone() }; @@ -230,10 +242,6 @@ fn main() { gst::FlowReturn::Ok }); - pipeline - .set_state(gst::State::Playing) - .into_result() - .expect("Unable to set the pipeline to the Playing state."); let main_loop = glib::MainLoop::new(None, false); let main_loop_clone = main_loop.clone(); let bus = pipeline.get_bus().unwrap(); @@ -252,10 +260,17 @@ fn main() { }); bus.add_signal_watch(); + pipeline + .set_state(gst::State::Playing) + .into_result() + .expect("Unable to set the pipeline to the Playing state."); + main_loop.run(); pipeline .set_state(gst::State::Null) .into_result() .expect("Unable to set the pipeline to the Null state."); + + bus.remove_signal_watch(); }