diff --git a/generic/threadshare/tests/pipeline.rs b/generic/threadshare/tests/pipeline.rs index 77852c66..46470354 100644 --- a/generic/threadshare/tests/pipeline.rs +++ b/generic/threadshare/tests/pipeline.rs @@ -588,3 +588,105 @@ fn premature_shutdown() { l.run(); gst::debug!(CAT, "Stopped main loop for premature_shutdown..."); } + +#[test] +fn socket_play_null_play() { + use gio::{ + prelude::SocketExt, InetAddress, InetSocketAddress, SocketFamily, SocketProtocol, + SocketType, + }; + + const TEST: &str = "socket_play_null_play"; + + init(); + + let l = glib::MainLoop::new(None, false); + let pipeline = gst::Pipeline::new(None); + + let sink = + gst::ElementFactory::make("ts-udpsink", Some(format!("sink-{}", TEST).as_str())).unwrap(); + + let socket = gio::Socket::new( + SocketFamily::Ipv4, + SocketType::Datagram, + SocketProtocol::Udp, + ) + .unwrap(); + socket + .bind( + &InetSocketAddress::new(&InetAddress::from_string("127.0.0.1").unwrap(), 4500), + true, + ) + .unwrap(); + + sink.set_property("socket", &socket); + sink.set_property("context", &TEST); + sink.set_property("context-wait", 20u32); + + pipeline.add(&sink).unwrap(); + + let pipeline_clone = pipeline.clone(); + let l_clone = l.clone(); + let (sender, receiver) = std::sync::mpsc::channel(); + let mut scenario = Some(move || { + gst::debug!(CAT, "{}: to Null", TEST); + pipeline_clone.set_state(gst::State::Null).unwrap(); + + gst::debug!(CAT, "{}: Play again", TEST); + let res = pipeline_clone.set_state(gst::State::Playing); + l_clone.quit(); + sender.send(res).unwrap(); + }); + + let l_clone = l.clone(); + pipeline + .bus() + .unwrap() + .add_watch(move |_, msg| { + use gst::MessageView; + + match msg.view() { + MessageView::StateChanged(state_changed) => { + if let Some(source) = state_changed.src() { + if source.type_() != gst::Pipeline::static_type() { + return glib::Continue(true); + } + if state_changed.old() == gst::State::Paused + && state_changed.current() == gst::State::Playing + { + if let Some(scenario) = scenario.take() { + std::thread::spawn(scenario); + } + } + } + } + MessageView::Error(err) => { + gst::error!( + CAT, + "{}: Error from {:?}: {} ({:?})", + TEST, + err.src().map(|s| s.path_string()), + err.error(), + err.debug() + ); + l_clone.quit(); + } + _ => (), + }; + + glib::Continue(true) + }) + .unwrap(); + + gst::debug!(CAT, "{}: Playing", TEST); + pipeline.set_state(gst::State::Playing).unwrap(); + + gst::debug!(CAT, "Starting main loop for {}...", TEST); + l.run(); + gst::debug!(CAT, "Stopping main loop for {}...", TEST); + let _ = pipeline.set_state(gst::State::Null); + + if let Err(err) = receiver.recv().unwrap() { + panic!("{}: {}", TEST, err); + } +}