tutorials: Use async-channel instead of the glib MainContext channel

The latter will be removed in favour of using async code in the future,
and async code generally allows for more flexible message handling than the
callback based MainContext channel.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer-rs/-/merge_requests/1337>
This commit is contained in:
Sebastian Dröge 2023-10-30 10:56:39 +02:00
parent 470b727252
commit 0a82caa706
3 changed files with 94 additions and 87 deletions

View file

@ -16,6 +16,8 @@ gst-pbutils = { package = "gstreamer-pbutils", path = "../gstreamer-pbutils", ve
byte-slice-cast = "1"
anyhow = "1"
termion = { version = "2", optional = true }
async-channel = "2.0.0"
futures = "0.3"
[target.'cfg(target_os = "macos")'.dependencies]
cocoa = "0.25"

View file

@ -65,7 +65,7 @@ fn send_seek_event(pipeline: &Element, rate: f64) -> bool {
}
// This is where we get the user input from the terminal.
fn handle_keyboard(ready_tx: glib::Sender<Command>) {
fn handle_keyboard(ready_tx: async_channel::Sender<Command>) {
// We set the terminal in "raw mode" so that we can get the keys without waiting for the user
// to press return.
let _stdout = io::stdout().into_raw_mode().unwrap();
@ -84,7 +84,7 @@ fn handle_keyboard(ready_tx: glib::Sender<Command>) {
_ => continue,
};
ready_tx
.send(command)
.send_blocking(command)
.expect("failed to send data through channel");
if command == Command::Quit {
break;
@ -116,7 +116,7 @@ USAGE: Choose one of the following options, then press enter:
let _guard = main_context.acquire().unwrap();
// Build the channel to get the terminal inputs from a different thread.
let (ready_tx, ready_rx) = glib::MainContext::channel(glib::Priority::DEFAULT);
let (ready_tx, ready_rx) = async_channel::bounded(5);
thread::spawn(move || handle_keyboard(ready_tx));
@ -135,11 +135,12 @@ USAGE: Choose one of the following options, then press enter:
let mut playing = true;
let mut rate = 1.;
ready_rx.attach(Some(&main_loop.context()), move |command: Command| {
let pipeline = match pipeline_weak.upgrade() {
Some(pipeline) => pipeline,
None => return glib::ControlFlow::Continue,
main_context.spawn_local(async move {
while let Ok(command) = ready_rx.recv().await {
let Some(pipeline) = pipeline_weak.upgrade() else {
break;
};
match command {
Command::PlayPause => {
let status = if playing {
@ -179,7 +180,7 @@ USAGE: Choose one of the following options, then press enter:
main_loop_clone.quit();
}
}
glib::ControlFlow::Continue
}
});
main_loop.run();

View file

@ -15,7 +15,7 @@ enum Command {
Quit,
}
fn handle_keyboard(ready_tx: glib::Sender<Command>) {
fn handle_keyboard(ready_tx: async_channel::Sender<Command>) {
let mut stdin = termion::async_stdin().keys();
loop {
@ -36,7 +36,7 @@ fn handle_keyboard(ready_tx: glib::Sender<Command>) {
_ => continue,
};
ready_tx
.send(command.clone())
.send_blocking(command.clone())
.expect("Failed to send command to the main thread.");
if command == Command::Quit {
break;
@ -104,7 +104,7 @@ fn tutorial_main() -> Result<(), Error> {
let _guard = main_context.acquire().unwrap();
// Build the channel to get the terminal inputs from a different thread.
let (ready_tx, ready_rx) = glib::MainContext::channel(glib::Priority::DEFAULT);
let (ready_tx, ready_rx) = async_channel::bounded(5);
// Start the keyboard handling thread
thread::spawn(move || handle_keyboard(ready_tx));
@ -121,10 +121,10 @@ fn tutorial_main() -> Result<(), Error> {
// Start playing
pipeline.set_state(gst::State::Playing)?;
ready_rx.attach(Some(&main_loop.context()), move |command: Command| {
let pipeline = match pipeline_weak.upgrade() {
Some(pipeline) => pipeline,
None => return glib::ControlFlow::Continue,
main_context.spawn_local(async move {
while let Ok(command) = ready_rx.recv().await {
let Some(pipeline) = pipeline_weak.upgrade() else {
break;
};
match command {
@ -139,19 +139,22 @@ fn tutorial_main() -> Result<(), Error> {
main_loop_clone.quit();
}
}
glib::ControlFlow::Continue
}
});
// Handle bus errors / EOS correctly
let main_loop_clone = main_loop.clone();
let bus = pipeline.bus().unwrap();
let mut bus_stream = bus.stream();
let pipeline_weak = pipeline.downgrade();
let _bus_watch = bus.add_watch(move |_bus, message| {
main_context.spawn_local(async move {
use futures::prelude::*;
while let Some(message) = bus_stream.next().await {
use gst::MessageView;
let pipeline = match pipeline_weak.upgrade() {
Some(pipeline) => pipeline,
None => return glib::ControlFlow::Continue,
let Some(pipeline) = pipeline_weak.upgrade() else {
break;
};
match message.view() {
@ -163,7 +166,7 @@ fn tutorial_main() -> Result<(), Error> {
);
eprintln!("Debugging information: {:?}", err.debug());
main_loop_clone.quit();
glib::ControlFlow::Break
break;
}
MessageView::Eos(..) => {
println!("Reached end of stream");
@ -171,11 +174,12 @@ fn tutorial_main() -> Result<(), Error> {
.set_state(gst::State::Ready)
.expect("Unable to set the pipeline to the `Ready` state");
main_loop_clone.quit();
glib::ControlFlow::Break
break;
}
_ => glib::ControlFlow::Continue,
_ => (),
}
})?;
}
});
// Print initial values for all channels
print_current_values(&pipeline);