forked from mirrors/gstreamer-rs
tutorials: Use async-channel instead of the glib MainContext channel
The latter will be removed in favour of using async code in the future, and async code generally allows for more flexible message handling than the callback based MainContext channel. Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer-rs/-/merge_requests/1333>
This commit is contained in:
parent
130dc49b22
commit
fc4a0d29c6
3 changed files with 94 additions and 87 deletions
|
@ -16,6 +16,8 @@ gst-pbutils = { package = "gstreamer-pbutils", path = "../gstreamer-pbutils" }
|
||||||
byte-slice-cast = "1"
|
byte-slice-cast = "1"
|
||||||
anyhow = "1"
|
anyhow = "1"
|
||||||
termion = { version = "2", optional = true }
|
termion = { version = "2", optional = true }
|
||||||
|
async-channel = "2.0.0"
|
||||||
|
futures = "0.3"
|
||||||
|
|
||||||
[target.'cfg(target_os = "macos")'.dependencies]
|
[target.'cfg(target_os = "macos")'.dependencies]
|
||||||
cocoa = "0.25"
|
cocoa = "0.25"
|
||||||
|
|
|
@ -65,7 +65,7 @@ fn send_seek_event(pipeline: &Element, rate: f64) -> bool {
|
||||||
}
|
}
|
||||||
|
|
||||||
// This is where we get the user input from the terminal.
|
// This is where we get the user input from the terminal.
|
||||||
fn handle_keyboard(ready_tx: glib::Sender<Command>) {
|
fn handle_keyboard(ready_tx: async_channel::Sender<Command>) {
|
||||||
// We set the terminal in "raw mode" so that we can get the keys without waiting for the user
|
// We set the terminal in "raw mode" so that we can get the keys without waiting for the user
|
||||||
// to press return.
|
// to press return.
|
||||||
let _stdout = io::stdout().into_raw_mode().unwrap();
|
let _stdout = io::stdout().into_raw_mode().unwrap();
|
||||||
|
@ -84,7 +84,7 @@ fn handle_keyboard(ready_tx: glib::Sender<Command>) {
|
||||||
_ => continue,
|
_ => continue,
|
||||||
};
|
};
|
||||||
ready_tx
|
ready_tx
|
||||||
.send(command)
|
.send_blocking(command)
|
||||||
.expect("failed to send data through channel");
|
.expect("failed to send data through channel");
|
||||||
if command == Command::Quit {
|
if command == Command::Quit {
|
||||||
break;
|
break;
|
||||||
|
@ -116,7 +116,7 @@ USAGE: Choose one of the following options, then press enter:
|
||||||
let _guard = main_context.acquire().unwrap();
|
let _guard = main_context.acquire().unwrap();
|
||||||
|
|
||||||
// Build the channel to get the terminal inputs from a different thread.
|
// Build the channel to get the terminal inputs from a different thread.
|
||||||
let (ready_tx, ready_rx) = glib::MainContext::channel(glib::Priority::DEFAULT);
|
let (ready_tx, ready_rx) = async_channel::bounded(5);
|
||||||
|
|
||||||
thread::spawn(move || handle_keyboard(ready_tx));
|
thread::spawn(move || handle_keyboard(ready_tx));
|
||||||
|
|
||||||
|
@ -135,51 +135,52 @@ USAGE: Choose one of the following options, then press enter:
|
||||||
let mut playing = true;
|
let mut playing = true;
|
||||||
let mut rate = 1.;
|
let mut rate = 1.;
|
||||||
|
|
||||||
ready_rx.attach(Some(&main_loop.context()), move |command: Command| {
|
main_context.spawn_local(async move {
|
||||||
let pipeline = match pipeline_weak.upgrade() {
|
while let Ok(command) = ready_rx.recv().await {
|
||||||
Some(pipeline) => pipeline,
|
let Some(pipeline) = pipeline_weak.upgrade() else {
|
||||||
None => return glib::ControlFlow::Continue,
|
break;
|
||||||
};
|
};
|
||||||
match command {
|
|
||||||
Command::PlayPause => {
|
match command {
|
||||||
let status = if playing {
|
Command::PlayPause => {
|
||||||
let _ = pipeline.set_state(State::Paused);
|
let status = if playing {
|
||||||
"PAUSE"
|
let _ = pipeline.set_state(State::Paused);
|
||||||
} else {
|
"PAUSE"
|
||||||
let _ = pipeline.set_state(State::Playing);
|
} else {
|
||||||
"PLAYING"
|
let _ = pipeline.set_state(State::Playing);
|
||||||
};
|
"PLAYING"
|
||||||
playing = !playing;
|
};
|
||||||
println!("Setting state to {status}\r");
|
playing = !playing;
|
||||||
}
|
println!("Setting state to {status}\r");
|
||||||
Command::DataRateUp => {
|
|
||||||
if send_seek_event(&pipeline, rate * 2.) {
|
|
||||||
rate *= 2.;
|
|
||||||
}
|
}
|
||||||
}
|
Command::DataRateUp => {
|
||||||
Command::DataRateDown => {
|
if send_seek_event(&pipeline, rate * 2.) {
|
||||||
if send_seek_event(&pipeline, rate / 2.) {
|
rate *= 2.;
|
||||||
rate /= 2.;
|
}
|
||||||
}
|
}
|
||||||
}
|
Command::DataRateDown => {
|
||||||
Command::ReverseRate => {
|
if send_seek_event(&pipeline, rate / 2.) {
|
||||||
if send_seek_event(&pipeline, rate * -1.) {
|
rate /= 2.;
|
||||||
rate *= -1.;
|
}
|
||||||
}
|
}
|
||||||
}
|
Command::ReverseRate => {
|
||||||
Command::NextFrame => {
|
if send_seek_event(&pipeline, rate * -1.) {
|
||||||
if let Some(video_sink) = pipeline.property::<Option<Element>>("video-sink") {
|
rate *= -1.;
|
||||||
// Send the event
|
}
|
||||||
let step = Step::new(gst::format::Buffers::ONE, rate.abs(), true, false);
|
}
|
||||||
video_sink.send_event(step);
|
Command::NextFrame => {
|
||||||
println!("Stepping one frame\r");
|
if let Some(video_sink) = pipeline.property::<Option<Element>>("video-sink") {
|
||||||
|
// Send the event
|
||||||
|
let step = Step::new(gst::format::Buffers::ONE, rate.abs(), true, false);
|
||||||
|
video_sink.send_event(step);
|
||||||
|
println!("Stepping one frame\r");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Command::Quit => {
|
||||||
|
main_loop_clone.quit();
|
||||||
}
|
}
|
||||||
}
|
|
||||||
Command::Quit => {
|
|
||||||
main_loop_clone.quit();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
glib::ControlFlow::Continue
|
|
||||||
});
|
});
|
||||||
|
|
||||||
main_loop.run();
|
main_loop.run();
|
||||||
|
|
|
@ -15,7 +15,7 @@ enum Command {
|
||||||
Quit,
|
Quit,
|
||||||
}
|
}
|
||||||
|
|
||||||
fn handle_keyboard(ready_tx: glib::Sender<Command>) {
|
fn handle_keyboard(ready_tx: async_channel::Sender<Command>) {
|
||||||
let mut stdin = termion::async_stdin().keys();
|
let mut stdin = termion::async_stdin().keys();
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
|
@ -36,7 +36,7 @@ fn handle_keyboard(ready_tx: glib::Sender<Command>) {
|
||||||
_ => continue,
|
_ => continue,
|
||||||
};
|
};
|
||||||
ready_tx
|
ready_tx
|
||||||
.send(command.clone())
|
.send_blocking(command.clone())
|
||||||
.expect("Failed to send command to the main thread.");
|
.expect("Failed to send command to the main thread.");
|
||||||
if command == Command::Quit {
|
if command == Command::Quit {
|
||||||
break;
|
break;
|
||||||
|
@ -104,7 +104,7 @@ fn tutorial_main() -> Result<(), Error> {
|
||||||
let _guard = main_context.acquire().unwrap();
|
let _guard = main_context.acquire().unwrap();
|
||||||
|
|
||||||
// Build the channel to get the terminal inputs from a different thread.
|
// Build the channel to get the terminal inputs from a different thread.
|
||||||
let (ready_tx, ready_rx) = glib::MainContext::channel(glib::Priority::DEFAULT);
|
let (ready_tx, ready_rx) = async_channel::bounded(5);
|
||||||
|
|
||||||
// Start the keyboard handling thread
|
// Start the keyboard handling thread
|
||||||
thread::spawn(move || handle_keyboard(ready_tx));
|
thread::spawn(move || handle_keyboard(ready_tx));
|
||||||
|
@ -121,61 +121,65 @@ fn tutorial_main() -> Result<(), Error> {
|
||||||
// Start playing
|
// Start playing
|
||||||
pipeline.set_state(gst::State::Playing)?;
|
pipeline.set_state(gst::State::Playing)?;
|
||||||
|
|
||||||
ready_rx.attach(Some(&main_loop.context()), move |command: Command| {
|
main_context.spawn_local(async move {
|
||||||
let pipeline = match pipeline_weak.upgrade() {
|
while let Ok(command) = ready_rx.recv().await {
|
||||||
Some(pipeline) => pipeline,
|
let Some(pipeline) = pipeline_weak.upgrade() else {
|
||||||
None => return glib::ControlFlow::Continue,
|
break;
|
||||||
};
|
};
|
||||||
|
|
||||||
match command {
|
match command {
|
||||||
Command::UpdateChannel(ref name, increase) => {
|
Command::UpdateChannel(ref name, increase) => {
|
||||||
let balance = pipeline
|
let balance = pipeline
|
||||||
.dynamic_cast_ref::<gst_video::ColorBalance>()
|
.dynamic_cast_ref::<gst_video::ColorBalance>()
|
||||||
.unwrap();
|
.unwrap();
|
||||||
update_color_channel(name, increase, balance);
|
update_color_channel(name, increase, balance);
|
||||||
print_current_values(&pipeline);
|
print_current_values(&pipeline);
|
||||||
}
|
}
|
||||||
Command::Quit => {
|
Command::Quit => {
|
||||||
main_loop_clone.quit();
|
main_loop_clone.quit();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
glib::ControlFlow::Continue
|
|
||||||
});
|
});
|
||||||
|
|
||||||
// Handle bus errors / EOS correctly
|
// Handle bus errors / EOS correctly
|
||||||
let main_loop_clone = main_loop.clone();
|
let main_loop_clone = main_loop.clone();
|
||||||
let bus = pipeline.bus().unwrap();
|
let bus = pipeline.bus().unwrap();
|
||||||
|
let mut bus_stream = bus.stream();
|
||||||
let pipeline_weak = pipeline.downgrade();
|
let pipeline_weak = pipeline.downgrade();
|
||||||
let _bus_watch = bus.add_watch(move |_bus, message| {
|
main_context.spawn_local(async move {
|
||||||
use gst::MessageView;
|
use futures::prelude::*;
|
||||||
|
|
||||||
let pipeline = match pipeline_weak.upgrade() {
|
while let Some(message) = bus_stream.next().await {
|
||||||
Some(pipeline) => pipeline,
|
use gst::MessageView;
|
||||||
None => return glib::ControlFlow::Continue,
|
|
||||||
};
|
|
||||||
|
|
||||||
match message.view() {
|
let Some(pipeline) = pipeline_weak.upgrade() else {
|
||||||
MessageView::Error(err) => {
|
break;
|
||||||
eprintln!(
|
};
|
||||||
"Error received from element {:?} {}",
|
|
||||||
err.src().map(|s| s.path_string()),
|
match message.view() {
|
||||||
err.error()
|
MessageView::Error(err) => {
|
||||||
);
|
eprintln!(
|
||||||
eprintln!("Debugging information: {:?}", err.debug());
|
"Error received from element {:?} {}",
|
||||||
main_loop_clone.quit();
|
err.src().map(|s| s.path_string()),
|
||||||
glib::ControlFlow::Break
|
err.error()
|
||||||
|
);
|
||||||
|
eprintln!("Debugging information: {:?}", err.debug());
|
||||||
|
main_loop_clone.quit();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
MessageView::Eos(..) => {
|
||||||
|
println!("Reached end of stream");
|
||||||
|
pipeline
|
||||||
|
.set_state(gst::State::Ready)
|
||||||
|
.expect("Unable to set the pipeline to the `Ready` state");
|
||||||
|
main_loop_clone.quit();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
_ => (),
|
||||||
}
|
}
|
||||||
MessageView::Eos(..) => {
|
|
||||||
println!("Reached end of stream");
|
|
||||||
pipeline
|
|
||||||
.set_state(gst::State::Ready)
|
|
||||||
.expect("Unable to set the pipeline to the `Ready` state");
|
|
||||||
main_loop_clone.quit();
|
|
||||||
glib::ControlFlow::Break
|
|
||||||
}
|
|
||||||
_ => glib::ControlFlow::Continue,
|
|
||||||
}
|
}
|
||||||
})?;
|
});
|
||||||
|
|
||||||
// Print initial values for all channels
|
// Print initial values for all channels
|
||||||
print_current_values(&pipeline);
|
print_current_values(&pipeline);
|
||||||
|
|
Loading…
Reference in a new issue