Compare commits
5 commits
d6ae2282f7
...
5d85a687c1
Author | SHA1 | Date | |
---|---|---|---|
5d85a687c1 | |||
3b046bdff3 | |||
7b56793c52 | |||
c31409d242 | |||
c189d88fdb |
3 changed files with 557 additions and 201 deletions
567
Cargo.lock
generated
567
Cargo.lock
generated
File diff suppressed because it is too large
Load diff
|
@ -1,11 +1,9 @@
|
||||||
[package]
|
[package]
|
||||||
name = "${REPO_NAME_LOWER}"
|
name = "dfmt-test"
|
||||||
license = "MPL-2.0"
|
license = "MPL-2.0"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
edition = "2021"
|
edition = "2021"
|
||||||
|
|
||||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
gst = { package = "gstreamer", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_20"] }
|
gst = { package = "gstreamer", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_20"] }
|
||||||
gst-video = { package = "gstreamer-video", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
|
gst-video = { package = "gstreamer-video", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
|
||||||
|
@ -13,7 +11,8 @@ gst-audio = { package = "gstreamer-audio", git = "https://gitlab.freedesktop.org
|
||||||
gst-app = { package = "gstreamer-app", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
|
gst-app = { package = "gstreamer-app", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
|
||||||
anyhow = "1"
|
anyhow = "1"
|
||||||
ctrlc = "3.2"
|
ctrlc = "3.2"
|
||||||
|
rand = "0.8.5"
|
||||||
tokio = { version = "1.17", features = ["full"] }
|
tokio = { version = "1.17", features = ["full"] }
|
||||||
axum = "0.5"
|
axum = "0.6"
|
||||||
tower = "0.4"
|
tower = "0.4"
|
||||||
tower-http = { version = "0.3", features = ["add-extension"] }
|
tower-http = { version = "0.4" }
|
||||||
|
|
182
src/main.rs
182
src/main.rs
|
@ -4,20 +4,103 @@ use anyhow::Result;
|
||||||
use gst::glib;
|
use gst::glib;
|
||||||
use gst::glib::once_cell::sync::Lazy;
|
use gst::glib::once_cell::sync::Lazy;
|
||||||
use gst::prelude::*;
|
use gst::prelude::*;
|
||||||
use std::{process, thread};
|
use rand::prelude::*;
|
||||||
|
use std::thread;
|
||||||
|
use std::thread::sleep;
|
||||||
use tokio::runtime::Builder;
|
use tokio::runtime::Builder;
|
||||||
|
|
||||||
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
|
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
|
||||||
gst::DebugCategory::new("main", gst::DebugColorFlags::empty(), Some("Main function"))
|
gst::DebugCategory::new("main", gst::DebugColorFlags::empty(), Some("Main function"))
|
||||||
});
|
});
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Copy)]
|
||||||
|
struct DtmfEvent(i32);
|
||||||
|
|
||||||
|
impl TryFrom<&gst::StructureRef> for DtmfEvent {
|
||||||
|
type Error = anyhow::Error;
|
||||||
|
|
||||||
|
fn try_from(structure: &gst::StructureRef) -> anyhow::Result<Self> {
|
||||||
|
let name = structure.name().to_string();
|
||||||
|
if name != "dtmf-event" {
|
||||||
|
anyhow::bail!("Not a dtmf-event structure: {name}");
|
||||||
|
}
|
||||||
|
let number = structure.get::<i32>("number")?;
|
||||||
|
Ok(Self(number))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Copy)]
|
||||||
|
enum DtmfCommand {
|
||||||
|
Start(i32),
|
||||||
|
End(Option<i32>),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl DtmfCommand {
|
||||||
|
fn start(number: i32) -> Self {
|
||||||
|
Self::Start(number)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn end(self) -> Self {
|
||||||
|
match self {
|
||||||
|
Self::Start(number) => Self::End(Some(number)),
|
||||||
|
Self::End(_) => self,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TryFrom<&gst::StructureRef> for DtmfCommand {
|
||||||
|
type Error = anyhow::Error;
|
||||||
|
|
||||||
|
fn try_from(structure: &gst::StructureRef) -> anyhow::Result<Self> {
|
||||||
|
let name = structure.name().to_string();
|
||||||
|
if !name.starts_with("dtmf-event") {
|
||||||
|
anyhow::bail!("Not a dtmf-event structure: {name}");
|
||||||
|
}
|
||||||
|
let number = structure.get_optional::<i32>("number")?;
|
||||||
|
if structure.get::<bool>("start")? {
|
||||||
|
Ok(Self::Start(number.ok_or_else(|| {
|
||||||
|
anyhow::anyhow!("No number specified for start DTMF command")
|
||||||
|
})?))
|
||||||
|
} else {
|
||||||
|
Ok(Self::End(number))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TryFrom<DtmfCommand> for gst::Structure {
|
||||||
|
type Error = anyhow::Error;
|
||||||
|
|
||||||
|
fn try_from(event: DtmfCommand) -> anyhow::Result<Self> {
|
||||||
|
let structure = match event {
|
||||||
|
DtmfCommand::Start(number) => gst::Structure::builder("dtmf-event")
|
||||||
|
.field("type", 1)
|
||||||
|
.field("start", true)
|
||||||
|
.field("number", number)
|
||||||
|
.field("volume", 36)
|
||||||
|
.build(),
|
||||||
|
DtmfCommand::End(number) => {
|
||||||
|
let Some(number) = number else {
|
||||||
|
anyhow::bail!("Cannot send end DTMF command without a specified number");
|
||||||
|
};
|
||||||
|
gst::Structure::builder("dtmf-event")
|
||||||
|
.field("type", 1)
|
||||||
|
.field("start", false)
|
||||||
|
.field("number", number)
|
||||||
|
.build()
|
||||||
|
}
|
||||||
|
};
|
||||||
|
Ok(structure)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn main() -> Result<()> {
|
fn main() -> Result<()> {
|
||||||
gst::init()?;
|
gst::init()?;
|
||||||
|
|
||||||
let pipeline = gst::parse_launch(
|
let pipeline = gst::parse_launch(
|
||||||
r#"
|
r#"
|
||||||
|
|
||||||
videotestsrc ! videoconvert ! timeoverlay shaded-background=true ! gtksink
|
dtmfsrc name=src ! mix.
|
||||||
|
audiotestsrc freq=0 ! audiomixer name=mix ! dtmfdetect ! audioconvert ! autoaudiosink name=audiosink
|
||||||
|
|
||||||
"#,
|
"#,
|
||||||
)?
|
)?
|
||||||
|
@ -27,8 +110,6 @@ fn main() -> Result<()> {
|
||||||
let context = glib::MainContext::default();
|
let context = glib::MainContext::default();
|
||||||
let main_loop = glib::MainLoop::new(Some(&context), false);
|
let main_loop = glib::MainLoop::new(Some(&context), false);
|
||||||
|
|
||||||
pipeline.set_state(gst::State::Playing)?;
|
|
||||||
|
|
||||||
let bus = pipeline.bus().unwrap();
|
let bus = pipeline.bus().unwrap();
|
||||||
bus.add_watch({
|
bus.add_watch({
|
||||||
let main_loop = main_loop.clone();
|
let main_loop = main_loop.clone();
|
||||||
|
@ -38,7 +119,7 @@ fn main() -> Result<()> {
|
||||||
match msg.view() {
|
match msg.view() {
|
||||||
MessageView::Eos(..) => main_loop.quit(),
|
MessageView::Eos(..) => main_loop.quit(),
|
||||||
MessageView::Error(err) => {
|
MessageView::Error(err) => {
|
||||||
gst::error!(CAT, obj: &err.src().unwrap(),
|
gst::error!(CAT, obj: err.src().unwrap(),
|
||||||
"Error from {:?}: {} ({:?})",
|
"Error from {:?}: {} ({:?})",
|
||||||
err.src().map(|s| s.path_string()),
|
err.src().map(|s| s.path_string()),
|
||||||
err.error(),
|
err.error(),
|
||||||
|
@ -46,6 +127,45 @@ fn main() -> Result<()> {
|
||||||
);
|
);
|
||||||
main_loop.quit();
|
main_loop.quit();
|
||||||
}
|
}
|
||||||
|
MessageView::Element(element) => {
|
||||||
|
match element.structure().unwrap().name().as_str() {
|
||||||
|
"dtmf-event" => {
|
||||||
|
let dtmf_event = DtmfEvent::try_from(element.structure().unwrap())
|
||||||
|
.expect("Failed to parse DTMF event");
|
||||||
|
gst::info!(CAT, "Detected DTMF event: {:?}", dtmf_event);
|
||||||
|
}
|
||||||
|
"dtmf-event-processed" => {
|
||||||
|
let dtmf_cmd = match DtmfCommand::try_from(element.structure().unwrap())
|
||||||
|
{
|
||||||
|
Ok(ev) => ev,
|
||||||
|
Err(err) => {
|
||||||
|
gst::error!(
|
||||||
|
CAT,
|
||||||
|
"Failed to parse DTMF event {:?} with error: {:?}",
|
||||||
|
element.structure().unwrap(),
|
||||||
|
err
|
||||||
|
);
|
||||||
|
return glib::Continue(true);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
match dtmf_cmd {
|
||||||
|
DtmfCommand::Start(number) => {
|
||||||
|
gst::info!(CAT, "Processed DTMF event {}", number);
|
||||||
|
}
|
||||||
|
DtmfCommand::End(_) => {
|
||||||
|
gst::info!(CAT, "Processed ending DTMF event");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ => {
|
||||||
|
gst::error!(
|
||||||
|
CAT,
|
||||||
|
"Received unknown event: {:?}",
|
||||||
|
element.structure().unwrap().name()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
_ => (),
|
_ => (),
|
||||||
};
|
};
|
||||||
glib::Continue(true)
|
glib::Continue(true)
|
||||||
|
@ -53,6 +173,50 @@ fn main() -> Result<()> {
|
||||||
})
|
})
|
||||||
.expect("Failed to add bus watch");
|
.expect("Failed to add bus watch");
|
||||||
|
|
||||||
|
thread::spawn({
|
||||||
|
let pipeline_weak = pipeline.downgrade();
|
||||||
|
move || {
|
||||||
|
let Some(pipeline) = pipeline_weak.upgrade() else {
|
||||||
|
gst::error!(CAT, "Pipeline gone...");
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
|
||||||
|
let source = pipeline.by_name("src").unwrap();
|
||||||
|
|
||||||
|
// wait pipeline to be running
|
||||||
|
let bus = pipeline.bus().unwrap();
|
||||||
|
while let Some(msg) = bus.timed_pop(None) {
|
||||||
|
use gst::MessageView;
|
||||||
|
if let MessageView::StateChanged(state_changed) = msg.view() {
|
||||||
|
if state_changed.src().unwrap() == &source
|
||||||
|
&& state_changed.current() == gst::State::Playing
|
||||||
|
{
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
gst::info!(CAT, "Pipeline is running");
|
||||||
|
|
||||||
|
let mut rng = rand::thread_rng();
|
||||||
|
loop {
|
||||||
|
let dtmf_cmd = DtmfCommand::start(rng.gen_range(0..15));
|
||||||
|
|
||||||
|
source.send_event(gst::event::CustomUpstream::new(
|
||||||
|
dtmf_cmd.try_into().unwrap(),
|
||||||
|
));
|
||||||
|
gst::info!(CAT, "Sent DTMF event {:?}", dtmf_cmd);
|
||||||
|
|
||||||
|
sleep(std::time::Duration::from_millis(10));
|
||||||
|
|
||||||
|
source.send_event(gst::event::CustomUpstream::new(
|
||||||
|
dtmf_cmd.end().try_into().unwrap(),
|
||||||
|
));
|
||||||
|
|
||||||
|
sleep(std::time::Duration::from_millis(1000));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
thread::spawn({
|
thread::spawn({
|
||||||
let pipeline_weak = pipeline.downgrade();
|
let pipeline_weak = pipeline.downgrade();
|
||||||
move || {
|
move || {
|
||||||
|
@ -66,12 +230,12 @@ fn main() -> Result<()> {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
pipeline.set_state(gst::State::Playing)?;
|
||||||
|
|
||||||
ctrlc::set_handler({
|
ctrlc::set_handler({
|
||||||
let pipeline_weak = pipeline.downgrade();
|
let main_loop = main_loop.clone();
|
||||||
move || {
|
move || {
|
||||||
let pipeline = pipeline_weak.upgrade().unwrap();
|
main_loop.quit();
|
||||||
pipeline.set_state(gst::State::Null).unwrap();
|
|
||||||
process::exit(0);
|
|
||||||
}
|
}
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue