mod server; use anyhow::Result; use gst::glib; use gst::glib::once_cell::sync::Lazy; use gst::prelude::*; use rand::prelude::*; use std::thread; use std::thread::sleep; use tokio::runtime::Builder; static CAT: Lazy = Lazy::new(|| { gst::DebugCategory::new("main", gst::DebugColorFlags::empty(), Some("Main function")) }); #[derive(Debug, Clone, Copy)] struct DtmfEvent(i32); impl TryFrom<&gst::StructureRef> for DtmfEvent { type Error = anyhow::Error; fn try_from(structure: &gst::StructureRef) -> anyhow::Result { let name = structure.name().to_string(); if name != "dtmf-event" { anyhow::bail!("Not a dtmf-event structure: {name}"); } let number = structure.get::("number")?; Ok(Self(number)) } } #[derive(Debug, Clone, Copy)] enum DtmfCommand { Start(i32), End(Option), } impl DtmfCommand { fn start(number: i32) -> Self { Self::Start(number) } fn end(self) -> Self { match self { Self::Start(number) => Self::End(Some(number)), Self::End(_) => self, } } } impl TryFrom<&gst::StructureRef> for DtmfCommand { type Error = anyhow::Error; fn try_from(structure: &gst::StructureRef) -> anyhow::Result { let name = structure.name().to_string(); if !name.starts_with("dtmf-event") { anyhow::bail!("Not a dtmf-event structure: {name}"); } let number = structure.get_optional::("number")?; if structure.get::("start")? { Ok(Self::Start(number.ok_or_else(|| { anyhow::anyhow!("No number specified for start DTMF command") })?)) } else { Ok(Self::End(number)) } } } impl TryFrom for gst::Structure { type Error = anyhow::Error; fn try_from(event: DtmfCommand) -> anyhow::Result { let structure = match event { DtmfCommand::Start(number) => gst::Structure::builder("dtmf-event") .field("type", 1) .field("start", true) .field("number", number) .field("volume", 36) .build(), DtmfCommand::End(number) => { let Some(number) = number else { anyhow::bail!("Cannot send end DTMF command without a specified number"); }; gst::Structure::builder("dtmf-event") .field("type", 1) .field("start", false) .field("number", number) .build() } }; Ok(structure) } } fn main() -> Result<()> { gst::init()?; let pipeline = gst::parse_launch( r#" dtmfsrc name=src ! mix. audiotestsrc freq=0 ! audiomixer name=mix ! dtmfdetect ! audioconvert ! autoaudiosink name=audiosink "#, )? .downcast::() .unwrap(); let context = glib::MainContext::default(); let main_loop = glib::MainLoop::new(Some(&context), false); let bus = pipeline.bus().unwrap(); bus.add_watch({ let main_loop = main_loop.clone(); move |_, msg| { use gst::MessageView; let main_loop = &main_loop; match msg.view() { MessageView::Eos(..) => main_loop.quit(), MessageView::Error(err) => { gst::error!(CAT, obj: err.src().unwrap(), "Error from {:?}: {} ({:?})", err.src().map(|s| s.path_string()), err.error(), err.debug() ); main_loop.quit(); } MessageView::Element(element) => { match element.structure().unwrap().name().as_str() { "dtmf-event" => { let dtmf_event = DtmfEvent::try_from(element.structure().unwrap()) .expect("Failed to parse DTMF event"); gst::info!(CAT, "Detected DTMF event: {:?}", dtmf_event); } "dtmf-event-processed" => { let dtmf_cmd = match DtmfCommand::try_from(element.structure().unwrap()) { Ok(ev) => ev, Err(err) => { gst::error!( CAT, "Failed to parse DTMF event {:?} with error: {:?}", element.structure().unwrap(), err ); return glib::Continue(true); } }; match dtmf_cmd { DtmfCommand::Start(number) => { gst::info!(CAT, "Processed DTMF event {}", number); } DtmfCommand::End(_) => { gst::info!(CAT, "Processed ending DTMF event"); } } } _ => { gst::error!( CAT, "Received unknown event: {:?}", element.structure().unwrap().name() ); } } } _ => (), }; glib::Continue(true) } }) .expect("Failed to add bus watch"); thread::spawn({ let pipeline_weak = pipeline.downgrade(); move || { let Some(pipeline) = pipeline_weak.upgrade() else { gst::error!(CAT, "Pipeline gone..."); return; }; let source = pipeline.by_name("src").unwrap(); // wait pipeline to be running let bus = pipeline.bus().unwrap(); while let Some(msg) = bus.timed_pop(None) { use gst::MessageView; if let MessageView::StateChanged(state_changed) = msg.view() { if state_changed.src().unwrap() == &source && state_changed.current() == gst::State::Playing { break; } } } gst::info!(CAT, "Pipeline is running"); let mut rng = rand::thread_rng(); loop { let dtmf_cmd = DtmfCommand::start(rng.gen_range(0..15)); source.send_event(gst::event::CustomUpstream::new( dtmf_cmd.try_into().unwrap(), )); gst::info!(CAT, "Sent DTMF event {:?}", dtmf_cmd); sleep(std::time::Duration::from_millis(10)); source.send_event(gst::event::CustomUpstream::new( dtmf_cmd.end().try_into().unwrap(), )); sleep(std::time::Duration::from_millis(1000)); } } }); thread::spawn({ let pipeline_weak = pipeline.downgrade(); move || { let runtime = Builder::new_multi_thread() .worker_threads(2) .thread_name("http-server") .enable_all() .build() .unwrap(); runtime.block_on(server::run(8080, pipeline_weak)) } }); pipeline.set_state(gst::State::Playing)?; ctrlc::set_handler({ let main_loop = main_loop.clone(); move || { main_loop.quit(); } })?; main_loop.run(); pipeline.set_state(gst::State::Null)?; bus.remove_watch().unwrap(); Ok(()) }