From 00001c963375ca1c893dd0ce1475e1f733a20b0e Mon Sep 17 00:00:00 2001 From: Rafael Caricio Date: Thu, 30 Mar 2023 16:20:23 +0200 Subject: [PATCH] Can listen to tones --- Cargo.lock | 152 +++++++++++++++----------------------------------- Cargo.toml | 10 ++-- src/main.rs | 125 ++++++++++++++++++++--------------------- src/server.rs | 11 +--- 4 files changed, 110 insertions(+), 188 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 939a073..c0e67db 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -134,12 +134,6 @@ dependencies = [ "tower-http", ] -[[package]] -name = "either" -version = "1.8.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7fcaabb2fef8c910e7f4c7ce9f67a1283a1715879a7c230ca9d6d1ae31f16d91" - [[package]] name = "fnv" version = "1.0.7" @@ -231,8 +225,9 @@ dependencies = [ [[package]] name = "gio-sys" -version = "0.18.0" -source = "git+https://github.com/gtk-rs/gtk-rs-core#3e6519c57af5534c51e6761ba3d6483118fc0e46" +version = "0.17.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6b1d43b0d7968b48455244ecafe41192871257f5740aa6b095eb19db78e362a5" dependencies = [ "glib-sys", "gobject-sys", @@ -243,8 +238,9 @@ dependencies = [ [[package]] name = "glib" -version = "0.18.0" -source = "git+https://github.com/gtk-rs/gtk-rs-core#3e6519c57af5534c51e6761ba3d6483118fc0e46" +version = "0.17.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cfb53061756195d76969292c2d2e329e01259276524a9bae6c9b73af62854773" dependencies = [ "bitflags", "futures-channel", @@ -265,8 +261,9 @@ dependencies = [ [[package]] name = "glib-macros" -version = "0.18.0" -source = "git+https://github.com/gtk-rs/gtk-rs-core#3e6519c57af5534c51e6761ba3d6483118fc0e46" +version = "0.17.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32e73a9790e243f6d55d8e302426419f6084a1de7a84cd07f7268300408a19de" dependencies = [ "anyhow", "heck", @@ -279,8 +276,9 @@ dependencies = [ [[package]] name = "glib-sys" -version = "0.18.0" -source = "git+https://github.com/gtk-rs/gtk-rs-core#3e6519c57af5534c51e6761ba3d6483118fc0e46" +version = "0.17.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49f00ad0a1bf548e61adfff15d83430941d9e1bb620e334f779edd1c745680a5" dependencies = [ "libc", "system-deps", @@ -288,8 +286,9 @@ dependencies = [ [[package]] name = "gobject-sys" -version = "0.18.0" -source = "git+https://github.com/gtk-rs/gtk-rs-core#3e6519c57af5534c51e6761ba3d6483118fc0e46" +version = "0.17.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "15e75b0000a64632b2d8ca3cf856af9308e3a970844f6e9659bd197f026793d0" dependencies = [ "glib-sys", "libc", @@ -298,8 +297,9 @@ dependencies = [ [[package]] name = "gstreamer" -version = "0.21.0" -source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#bc81e5a6a2c0bad0078a24ea7fb61db18fd1007c" +version = "0.20.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c46cc10a7ab79329feb68bef54a242ced84c3147cc1b81bc5c6140346a1dbf9" dependencies = [ "bitflags", "cfg-if", @@ -308,7 +308,6 @@ dependencies = [ "futures-util", "glib", "gstreamer-sys", - "itertools", "libc", "muldiv", "num-integer", @@ -323,8 +322,9 @@ dependencies = [ [[package]] name = "gstreamer-app" -version = "0.21.0" -source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#bc81e5a6a2c0bad0078a24ea7fb61db18fd1007c" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aa1550d18fe8d97900148cc97d63a3212c3d53169c8469b9bf617de8953c05a8" dependencies = [ "bitflags", "futures-core", @@ -339,8 +339,9 @@ dependencies = [ [[package]] name = "gstreamer-app-sys" -version = "0.21.0" -source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#bc81e5a6a2c0bad0078a24ea7fb61db18fd1007c" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7cb5375aa8c23012ec5fadde1a37b972e87680776772669d2628772867056e2" dependencies = [ "glib-sys", "gstreamer-base-sys", @@ -351,8 +352,9 @@ dependencies = [ [[package]] name = "gstreamer-audio" -version = "0.21.0" -source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#bc81e5a6a2c0bad0078a24ea7fb61db18fd1007c" +version = "0.20.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ca6d26ab15835a268939e2367ed4ddb1e7157b03d0bb56ba4a0b036c1ac8393" dependencies = [ "bitflags", "cfg-if", @@ -366,8 +368,9 @@ dependencies = [ [[package]] name = "gstreamer-audio-sys" -version = "0.21.0" -source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#bc81e5a6a2c0bad0078a24ea7fb61db18fd1007c" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d4001b779e4707b32acd6ec0960e327b926369c1a34f7c41d477ac42b2670e8" dependencies = [ "glib-sys", "gobject-sys", @@ -379,8 +382,9 @@ dependencies = [ [[package]] name = "gstreamer-base" -version = "0.21.0" -source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#bc81e5a6a2c0bad0078a24ea7fb61db18fd1007c" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5598bfedbff12675a6cddbe420b6a3ba5039c64aaf7df130db6339d09b634b0e" dependencies = [ "atomic_refcell", "bitflags", @@ -393,8 +397,9 @@ dependencies = [ [[package]] name = "gstreamer-base-sys" -version = "0.21.0" -source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#bc81e5a6a2c0bad0078a24ea7fb61db18fd1007c" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26114ed96f6668380f5a1554128159e98e06c3a7a8460f216d7cd6dce28f928c" dependencies = [ "glib-sys", "gobject-sys", @@ -405,8 +410,9 @@ dependencies = [ [[package]] name = "gstreamer-sys" -version = "0.21.0" -source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#bc81e5a6a2c0bad0078a24ea7fb61db18fd1007c" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e56fe047adef7d47dbafa8bc1340fddb53c325e16574763063702fc94b5786d2" dependencies = [ "glib-sys", "gobject-sys", @@ -416,8 +422,9 @@ dependencies = [ [[package]] name = "gstreamer-video" -version = "0.21.0" -source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#bc81e5a6a2c0bad0078a24ea7fb61db18fd1007c" +version = "0.20.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "467cddb6a4135e72fefb6ba21262b1cca5493e9928792e88fe672ec0a37b761c" dependencies = [ "bitflags", "cfg-if", @@ -432,8 +439,9 @@ dependencies = [ [[package]] name = "gstreamer-video-sys" -version = "0.21.0" -source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#bc81e5a6a2c0bad0078a24ea7fb61db18fd1007c" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "66ddb6112d438aac0004d2db6053a572f92b1c5e0e9d6ff6c71d9245f7f73e46" dependencies = [ "glib-sys", "gobject-sys", @@ -537,15 +545,6 @@ dependencies = [ "hashbrown", ] -[[package]] -name = "itertools" -version = "0.10.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b0fd2260e829bddf4cb6ea802289de2f86d6a7a690192fbe91b3f46e0f2c8473" -dependencies = [ - "either", -] - [[package]] name = "itoa" version = "1.0.6" @@ -558,16 +557,6 @@ version = "0.2.140" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "99227334921fae1a979cf0bfdfcc6b3e5ce376ef57e16fb6fb3ea2ed6095f80c" -[[package]] -name = "lock_api" -version = "0.4.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "435011366fe56583b16cf956f9df0095b405b82d76425bc8981c0e22e60ec4df" -dependencies = [ - "autocfg", - "scopeguard", -] - [[package]] name = "log" version = "0.4.17" @@ -680,29 +669,6 @@ dependencies = [ "paste", ] -[[package]] -name = "parking_lot" -version = "0.12.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f" -dependencies = [ - "lock_api", - "parking_lot_core", -] - -[[package]] -name = "parking_lot_core" -version = "0.9.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9069cbb9f99e3a5083476ccb29ceb1de18b9118cafa53e90c9551235de2b9521" -dependencies = [ - "cfg-if", - "libc", - "redox_syscall", - "smallvec", - "windows-sys", -] - [[package]] name = "paste" version = "1.0.12" @@ -847,15 +813,6 @@ dependencies = [ "getrandom", ] -[[package]] -name = "redox_syscall" -version = "0.2.16" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fb5a58c1855b4b6819d59012155603f0b22ad30cad752600aadfcb695265519a" -dependencies = [ - "bitflags", -] - [[package]] name = "rustversion" version = "1.0.12" @@ -868,12 +825,6 @@ version = "1.0.13" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f91339c0467de62360649f8d3e185ca8de4224ff281f66000de5eb2a77a79041" -[[package]] -name = "scopeguard" -version = "1.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" - [[package]] name = "serde" version = "1.0.158" @@ -1034,25 +985,12 @@ dependencies = [ "memchr", "mio", "num_cpus", - "parking_lot", "pin-project-lite", "signal-hook-registry", "socket2", - "tokio-macros", "windows-sys", ] -[[package]] -name = "tokio-macros" -version = "1.8.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d266c00fde287f55d3f1c3e96c500c362a2b8c695076ec180f27918820bc6df8" -dependencies = [ - "proc-macro2", - "quote", - "syn 1.0.109", -] - [[package]] name = "toml" version = "0.7.3" diff --git a/Cargo.toml b/Cargo.toml index 67cf0c6..0bedef9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,14 +5,14 @@ version = "0.1.0" edition = "2021" [dependencies] -gst = { package = "gstreamer", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_20"] } -gst-video = { package = "gstreamer-video", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" } -gst-audio = { package = "gstreamer-audio", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" } -gst-app = { package = "gstreamer-app", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" } +gst = { package = "gstreamer", version = "0.20.3", features = ["v1_20"] } +gst-video = { package = "gstreamer-video", version = "0.20.3" } +gst-audio = { package = "gstreamer-audio", version = "0.20.2" } +gst-app = { package = "gstreamer-app", version = "0.20.0" } anyhow = "1" ctrlc = "3.2" rand = "0.8.5" -tokio = { version = "1.17", features = ["full"] } +tokio = { version = "1.17", features = ["io-util", "sync", "process", "rt-multi-thread"] } axum = "0.6" tower = "0.4" tower-http = { version = "0.4" } diff --git a/src/main.rs b/src/main.rs index dd7cf3d..7bd4034 100644 --- a/src/main.rs +++ b/src/main.rs @@ -10,7 +10,7 @@ use std::thread::sleep; use tokio::runtime::Builder; static CAT: Lazy = Lazy::new(|| { - gst::DebugCategory::new("main", gst::DebugColorFlags::empty(), Some("Main function")) + gst::DebugCategory::new("hawkear", gst::DebugColorFlags::empty(), Some("Main function")) }); #[derive(Debug, Clone, Copy)] @@ -32,7 +32,7 @@ impl TryFrom<&gst::StructureRef> for DtmfEvent { #[derive(Debug, Clone, Copy)] enum DtmfCommand { Start(i32), - End(Option), + End(i32), } impl DtmfCommand { @@ -42,7 +42,7 @@ impl DtmfCommand { fn end(self) -> Self { match self { - Self::Start(number) => Self::End(Some(number)), + Self::Start(number) => Self::End(number), Self::End(_) => self, } } @@ -56,22 +56,18 @@ impl TryFrom<&gst::StructureRef> for DtmfCommand { if !name.starts_with("dtmf-event") { anyhow::bail!("Not a dtmf-event structure: {name}"); } - let number = structure.get_optional::("number")?; + let number = structure.get::("number")?; if structure.get::("start")? { - Ok(Self::Start(number.ok_or_else(|| { - anyhow::anyhow!("No number specified for start DTMF command") - })?)) + Ok(Self::Start(number)) } else { Ok(Self::End(number)) } } } -impl TryFrom for gst::Structure { - type Error = anyhow::Error; - - fn try_from(event: DtmfCommand) -> anyhow::Result { - let structure = match event { +impl From for gst::Structure { + fn from(event: DtmfCommand) -> Self { + match event { DtmfCommand::Start(number) => gst::Structure::builder("dtmf-event") .field("type", 1) .field("start", true) @@ -79,17 +75,13 @@ impl TryFrom for gst::Structure { .field("volume", 36) .build(), DtmfCommand::End(number) => { - let Some(number) = number else { - anyhow::bail!("Cannot send end DTMF command without a specified number"); - }; gst::Structure::builder("dtmf-event") .field("type", 1) .field("start", false) .field("number", number) .build() } - }; - Ok(structure) + } } } @@ -99,8 +91,9 @@ fn main() -> Result<()> { let pipeline = gst::parse_launch( r#" - dtmfsrc name=src ! mix. - audiotestsrc freq=0 ! audiomixer name=mix ! dtmfdetect ! audioconvert ! autoaudiosink name=audiosink + srtsrc uri="srt://3.221.115.181:6002" ! decodebin name=d ! queue name=vid-queue ! videoconvert ! autovideosink + d. ! queue name=main-audio-queue ! audioconvert name=conv-main ! autoaudiosink name=audiosink + d. ! queue name=dtmf-tones-queue ! deinterleave name=deinter ! audioresample ! audioconvert name=conv-tone ! dtmfdetect ! fakesink name=dtmf_sink "#, )? @@ -130,9 +123,10 @@ fn main() -> Result<()> { MessageView::Element(element) => { match element.structure().unwrap().name().as_str() { "dtmf-event" => { + gst::info!(CAT, "Received DTMF event: {:?}", element.structure().unwrap()); let dtmf_event = DtmfEvent::try_from(element.structure().unwrap()) .expect("Failed to parse DTMF event"); - gst::info!(CAT, "Detected DTMF event: {:?}", dtmf_event); + gst::info!(CAT, "Parsed the detected DTMF event: {:?}", dtmf_event); } "dtmf-event-processed" => { let dtmf_cmd = match DtmfCommand::try_from(element.structure().unwrap()) @@ -150,10 +144,10 @@ fn main() -> Result<()> { }; match dtmf_cmd { DtmfCommand::Start(number) => { - gst::info!(CAT, "Processed DTMF event {}", number); + gst::trace!(CAT, "Processed DTMF event {number}"); } - DtmfCommand::End(_) => { - gst::info!(CAT, "Processed ending DTMF event"); + DtmfCommand::End(number) => { + gst::trace!(CAT, "Processed ending DTMF event: {number}"); } } } @@ -173,49 +167,48 @@ fn main() -> Result<()> { }) .expect("Failed to add bus watch"); - thread::spawn({ - let pipeline_weak = pipeline.downgrade(); - move || { - let Some(pipeline) = pipeline_weak.upgrade() else { - gst::error!(CAT, "Pipeline gone..."); - return; - }; - - let source = pipeline.by_name("src").unwrap(); - - // wait pipeline to be running - let bus = pipeline.bus().unwrap(); - while let Some(msg) = bus.timed_pop(None) { - use gst::MessageView; - if let MessageView::StateChanged(state_changed) = msg.view() { - if state_changed.src().unwrap() == &source - && state_changed.current() == gst::State::Playing - { - break; - } - } - } - gst::info!(CAT, "Pipeline is running"); - - let mut rng = rand::thread_rng(); - loop { - let dtmf_cmd = DtmfCommand::start(rng.gen_range(0..15)); - - source.send_event(gst::event::CustomUpstream::new( - dtmf_cmd.try_into().unwrap(), - )); - gst::info!(CAT, "Sent DTMF event {:?}", dtmf_cmd); - - sleep(std::time::Duration::from_millis(10)); - - source.send_event(gst::event::CustomUpstream::new( - dtmf_cmd.end().try_into().unwrap(), - )); - - sleep(std::time::Duration::from_millis(1000)); - } - } - }); + // thread::spawn({ + // let pipeline_weak = pipeline.downgrade(); + // move || { + // let Some(pipeline) = pipeline_weak.upgrade() else { + // gst::error!(CAT, "Pipeline gone..."); + // return; + // }; + // + // let source = pipeline.by_name("src").unwrap(); + // // + // // // wait pipeline to be running + // // let bus = pipeline.bus().unwrap(); + // // while let Some(msg) = bus.timed_pop(None) { + // // use gst::MessageView; + // // if let MessageView::StateChanged(state_changed) = msg.view() { + // // if state_changed.src().unwrap() == &source + // // && state_changed.current() == gst::State::Playing + // // { + // // break; + // // } + // // } + // // } + // // gst::info!(CAT, "Pipeline is running"); + // + // // sleep(std::time::Duration::from_secs(5)); + // // source.send_event(gst::event::CustomUpstream::new(DtmfCommand::start(0).into())); + // // gst::info!(CAT, "Sent DTMF event"); + // + // // let mut rng = rand::thread_rng(); + // // loop { + // // let dtmf_cmd = DtmfCommand::start(rng.gen_range(0..15)); + // + // // source.send_event(gst::event::CustomUpstream::new(DtmfCommand::start(0).into())); + // // gst::info!(CAT, "Sent DTMF event {:?}", dtmf_cmd); + // + // + // // source.send_event(gst::event::CustomUpstream::new(dtmf_cmd.end().into())); + // + // // sleep(std::time::Duration::from_millis(1000)); + // // } + // } + // }); thread::spawn({ let pipeline_weak = pipeline.downgrade(); diff --git a/src/server.rs b/src/server.rs index c6db67d..4bcfdce 100644 --- a/src/server.rs +++ b/src/server.rs @@ -37,8 +37,7 @@ pub async fn run(port: u16, pipeline_weak: glib::WeakRef) { let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), port); let router = Router::new() .route("/healthcheck", get(healthcheck)) - .route("/pipeline-diagram", get(pipeline_diagram)) - .route("/pipeline-diagram.png", get(pipeline_diagram_image)) + .route("/pipeline.png", get(pipeline_diagram_image)) .layer( ServiceBuilder::new() .layer(Extension(SharedState::new(RwLock::new(State::new( @@ -60,14 +59,6 @@ async fn healthcheck(Extension(state): Extension) -> Html { } } -async fn pipeline_diagram(Extension(state): Extension) -> Html { - if let Some(pipeline) = &state.read().await.pipeline.upgrade() { - Html(dot_graph(pipeline)) - } else { - Html("

Pipeline gone...

".into()) - } -} - async fn pipeline_diagram_image(Extension(state): Extension) -> impl IntoResponse { if let Some(pipeline) = &state.read().await.pipeline.upgrade() { let headers = response::AppendHeaders([(header::CONTENT_TYPE, "image/png")]);