mod server; use anyhow::Result; use gst::glib; use gst::glib::once_cell::sync::Lazy; use gst::prelude::*; use rand::prelude::*; use std::thread; use std::thread::sleep; use tokio::runtime::Builder; static CAT: Lazy = Lazy::new(|| { gst::DebugCategory::new("hawkear", gst::DebugColorFlags::empty(), Some("Main function")) }); #[derive(Debug, Clone, Copy)] struct DtmfEvent(i32); impl TryFrom<&gst::StructureRef> for DtmfEvent { type Error = anyhow::Error; fn try_from(structure: &gst::StructureRef) -> anyhow::Result { let name = structure.name().to_string(); if name != "dtmf-event" { anyhow::bail!("Not a dtmf-event structure: {name}"); } let number = structure.get::("number")?; Ok(Self(number)) } } #[derive(Debug, Clone, Copy)] enum DtmfCommand { Start(i32), End(i32), } impl DtmfCommand { fn start(number: i32) -> Self { Self::Start(number) } fn end(self) -> Self { match self { Self::Start(number) => Self::End(number), Self::End(_) => self, } } } impl TryFrom<&gst::StructureRef> for DtmfCommand { type Error = anyhow::Error; fn try_from(structure: &gst::StructureRef) -> anyhow::Result { let name = structure.name().to_string(); if !name.starts_with("dtmf-event") { anyhow::bail!("Not a dtmf-event structure: {name}"); } let number = structure.get::("number")?; if structure.get::("start")? { Ok(Self::Start(number)) } else { Ok(Self::End(number)) } } } impl From for gst::Structure { fn from(event: DtmfCommand) -> Self { match event { DtmfCommand::Start(number) => gst::Structure::builder("dtmf-event") .field("type", 1) .field("start", true) .field("number", number) .field("volume", 36) .build(), DtmfCommand::End(number) => { gst::Structure::builder("dtmf-event") .field("type", 1) .field("start", false) .field("number", number) .build() } } } } fn main() -> Result<()> { gst::init()?; let pipeline = gst::parse_launch( r#" srtsrc uri="srt://3.221.115.181:6002" ! decodebin name=d ! queue name=vid-queue ! videoconvert ! autovideosink d. ! queue name=main-audio-queue ! audioconvert name=conv-main ! autoaudiosink name=audiosink d. ! queue name=dtmf-tones-queue ! deinterleave name=deinter ! audioresample ! audioconvert name=conv-tone ! dtmfdetect ! fakesink name=dtmf_sink "#, )? .downcast::() .unwrap(); let context = glib::MainContext::default(); let main_loop = glib::MainLoop::new(Some(&context), false); let bus = pipeline.bus().unwrap(); bus.add_watch({ let main_loop = main_loop.clone(); move |_, msg| { use gst::MessageView; let main_loop = &main_loop; match msg.view() { MessageView::Eos(..) => main_loop.quit(), MessageView::Error(err) => { gst::error!(CAT, obj: err.src().unwrap(), "Error from {:?}: {} ({:?})", err.src().map(|s| s.path_string()), err.error(), err.debug() ); main_loop.quit(); } MessageView::Element(element) => { match element.structure().unwrap().name().as_str() { "dtmf-event" => { gst::info!(CAT, "Received DTMF event: {:?}", element.structure().unwrap()); let dtmf_event = DtmfEvent::try_from(element.structure().unwrap()) .expect("Failed to parse DTMF event"); gst::info!(CAT, "Parsed the detected DTMF event: {:?}", dtmf_event); } "dtmf-event-processed" => { let dtmf_cmd = match DtmfCommand::try_from(element.structure().unwrap()) { Ok(ev) => ev, Err(err) => { gst::error!( CAT, "Failed to parse DTMF event {:?} with error: {:?}", element.structure().unwrap(), err ); return glib::Continue(true); } }; match dtmf_cmd { DtmfCommand::Start(number) => { gst::trace!(CAT, "Processed DTMF event {number}"); } DtmfCommand::End(number) => { gst::trace!(CAT, "Processed ending DTMF event: {number}"); } } } _ => { gst::error!( CAT, "Received unknown event: {:?}", element.structure().unwrap().name() ); } } } _ => (), }; glib::Continue(true) } }) .expect("Failed to add bus watch"); // thread::spawn({ // let pipeline_weak = pipeline.downgrade(); // move || { // let Some(pipeline) = pipeline_weak.upgrade() else { // gst::error!(CAT, "Pipeline gone..."); // return; // }; // // let source = pipeline.by_name("src").unwrap(); // // // // // wait pipeline to be running // // let bus = pipeline.bus().unwrap(); // // while let Some(msg) = bus.timed_pop(None) { // // use gst::MessageView; // // if let MessageView::StateChanged(state_changed) = msg.view() { // // if state_changed.src().unwrap() == &source // // && state_changed.current() == gst::State::Playing // // { // // break; // // } // // } // // } // // gst::info!(CAT, "Pipeline is running"); // // // sleep(std::time::Duration::from_secs(5)); // // source.send_event(gst::event::CustomUpstream::new(DtmfCommand::start(0).into())); // // gst::info!(CAT, "Sent DTMF event"); // // // let mut rng = rand::thread_rng(); // // loop { // // let dtmf_cmd = DtmfCommand::start(rng.gen_range(0..15)); // // // source.send_event(gst::event::CustomUpstream::new(DtmfCommand::start(0).into())); // // gst::info!(CAT, "Sent DTMF event {:?}", dtmf_cmd); // // // // source.send_event(gst::event::CustomUpstream::new(dtmf_cmd.end().into())); // // // sleep(std::time::Duration::from_millis(1000)); // // } // } // }); thread::spawn({ let pipeline_weak = pipeline.downgrade(); move || { let runtime = Builder::new_multi_thread() .worker_threads(2) .thread_name("http-server") .enable_all() .build() .unwrap(); runtime.block_on(server::run(8080, pipeline_weak)) } }); pipeline.set_state(gst::State::Playing)?; ctrlc::set_handler({ let main_loop = main_loop.clone(); move || { main_loop.quit(); } })?; main_loop.run(); pipeline.set_state(gst::State::Null)?; bus.remove_watch().unwrap(); Ok(()) }