From ff2b71cbbd02b4b6fc47d6f1fce471b015e1ed88 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Laignel?= Date: Thu, 26 Jun 2025 17:21:10 +0200 Subject: [PATCH] ts: add ts-rtpdtmfsrc This commit adds a `ts-rtpdtmfsrc`, similar to `rtpdtmfsrc`, but taking advantage of the threadshare runtime, allowing reduced number of threads and context switches when many elements are used. Part-of: --- Cargo.lock | 2 + docs/plugins/gst_plugins_cache.json | 160 +++ generic/threadshare/Cargo.toml | 6 + generic/threadshare/src/lib.rs | 2 + generic/threadshare/src/rtpdtmfsrc/imp.rs | 1418 +++++++++++++++++++++ generic/threadshare/src/rtpdtmfsrc/mod.rs | 17 + generic/threadshare/tests/rtpdtmfsrc.rs | 413 ++++++ 7 files changed, 2018 insertions(+) create mode 100644 generic/threadshare/src/rtpdtmfsrc/imp.rs create mode 100644 generic/threadshare/src/rtpdtmfsrc/mod.rs create mode 100644 generic/threadshare/tests/rtpdtmfsrc.rs diff --git a/Cargo.lock b/Cargo.lock index 95859e24d..cf685ce9c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3407,9 +3407,11 @@ dependencies = [ "pkg-config", "polling", "rand 0.9.1", + "rtp-types", "rustix 1.0.7", "slab", "socket2", + "thiserror 2.0.12", "waker-fn", "windows-sys 0.59.0", ] diff --git a/docs/plugins/gst_plugins_cache.json b/docs/plugins/gst_plugins_cache.json index ae67de3f2..94e79ecc1 100644 --- a/docs/plugins/gst_plugins_cache.json +++ b/docs/plugins/gst_plugins_cache.json @@ -16833,6 +16833,166 @@ }, "rank": "none" }, + "ts-rtpdtmfsrc": { + "author": "François Laignel ", + "description": "Thread-sharing RTP DTMF packet (RFC2833) source", + "hierarchy": [ + "GstTsRTPDTMFSrc", + "GstElement", + "GstObject", + "GInitiallyUnowned", + "GObject" + ], + "klass": "Source/Network/RTP", + "pad-templates": { + "src": { + "caps": "application/x-rtp:\n media: audio\n payload: [ 96, 127 ]\n clock-rate: [ 0, 2147483647 ]\n encoding-name: TELEPHONE-EVENT\n", + "direction": "src", + "presence": "always" + } + }, + "properties": { + "clock-rate": { + "blurb": "The clock-rate at which to generate DTMF packets", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "8000", + "max": "-1", + "min": "0", + "mutable": "null", + "readable": true, + "type": "guint", + "writable": true + }, + "context": { + "blurb": "Context name to share threads with", + "conditionally-available": false, + "construct": false, + "construct-only": true, + "controllable": false, + "default": "", + "mutable": "null", + "readable": true, + "type": "gchararray", + "writable": true + }, + "context-wait": { + "blurb": "Throttle poll loop to run at most once every this many ms", + "conditionally-available": false, + "construct": false, + "construct-only": true, + "controllable": false, + "default": "0", + "max": "1000", + "min": "0", + "mutable": "null", + "readable": true, + "type": "guint", + "writable": true + }, + "packet-redundancy": { + "blurb": "Number of packets to send to indicate start and stop DTMF events", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "1", + "max": "5", + "min": "1", + "mutable": "null", + "readable": true, + "type": "guint", + "writable": true + }, + "pt": { + "blurb": "The payload type of the packets", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "96", + "max": "128", + "min": "0", + "mutable": "null", + "readable": true, + "type": "guint", + "writable": true + }, + "seqnum": { + "blurb": "The RTP Sequence number of the last processed packet", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "0", + "max": "-1", + "min": "0", + "mutable": "null", + "readable": true, + "type": "guint", + "writable": false + }, + "seqnum-offset": { + "blurb": "Offset to add to all outgoing seqnum (-1 => random)", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "-1", + "max": "2147483647", + "min": "-1", + "mutable": "null", + "readable": true, + "type": "gint", + "writable": true + }, + "ssrc": { + "blurb": "The SSRC of the packets (-1 => random)", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "-1", + "max": "-1", + "min": "0", + "mutable": "null", + "readable": true, + "type": "guint", + "writable": true + }, + "timestamp": { + "blurb": "The RTP timestamp of the last processed packet", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "0", + "max": "-1", + "min": "0", + "mutable": "null", + "readable": true, + "type": "guint", + "writable": false + }, + "timestamp-offset": { + "blurb": "Offset to add to all outgoing timestamps (-1 = random)", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "-1", + "max": "-1", + "min": "-1", + "mutable": "null", + "readable": true, + "type": "gint", + "writable": true + } + }, + "rank": "none" + }, "ts-tcpclientsrc": { "author": "Sebastian Dröge , LEE Dongjun ", "description": "Receives data over the network via TCP", diff --git a/generic/threadshare/Cargo.toml b/generic/threadshare/Cargo.toml index 3bc139acc..93cba5836 100644 --- a/generic/threadshare/Cargo.toml +++ b/generic/threadshare/Cargo.toml @@ -22,9 +22,11 @@ gst-rtp.workspace = true pin-project-lite = "0.2.0" polling = "3.1.0" rand = "0.9" +rtp-types = "0.1" rustix = { version = "1.0", default-features = false, features = ["std", "fs", "net"] } slab = "0.4.7" socket2 = {features = ["all"], version = "0.5"} +thiserror = "2" waker-fn = "1.1" bitflags = "2.6.0" libc = "0.2" @@ -71,6 +73,10 @@ path = "examples/inter/simple.rs" name = "ts-inter" path = "tests/inter.rs" +[[test]] +name = "ts-rtpdtmfsrc" +path = "tests/rtpdtmfsrc.rs" + [build-dependencies] gst-plugin-version-helper.workspace = true cc = "1.0.38" diff --git a/generic/threadshare/src/lib.rs b/generic/threadshare/src/lib.rs index a8e08fcb1..6b7b39b3c 100644 --- a/generic/threadshare/src/lib.rs +++ b/generic/threadshare/src/lib.rs @@ -22,6 +22,7 @@ mod inter; mod jitterbuffer; mod proxy; mod queue; +mod rtpdtmfsrc; pub mod socket; mod tcpclientsrc; mod udpsink; @@ -39,6 +40,7 @@ fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { jitterbuffer::register(plugin)?; proxy::register(plugin)?; queue::register(plugin)?; + rtpdtmfsrc::register(plugin)?; tcpclientsrc::register(plugin)?; udpsink::register(plugin)?; udpsrc::register(plugin)?; diff --git a/generic/threadshare/src/rtpdtmfsrc/imp.rs b/generic/threadshare/src/rtpdtmfsrc/imp.rs new file mode 100644 index 000000000..51626efa1 --- /dev/null +++ b/generic/threadshare/src/rtpdtmfsrc/imp.rs @@ -0,0 +1,1418 @@ +// Copyright (C) 2025 François Laignel +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +/** + * SECTION:element-ts-rtpdtmfsrc + * @title: ts-rtpdtmfsrc + * @see_also: dtmfsrc, rtpdtmfdepay, rtpdtmfmux + * + * Thread-sharing RTP DTMF (RFC 2833) source. + * + * The `ts-rtpdtmfsrc` element generates RTP DTMF (RFC 2833) event packets on request + * from application. The application communicates the beginning and end of a + * DTMF event using custom upstream gstreamer events. + * + * Compated to `rtpdtmfsrc`, `ts-rtpdtmfsrc` takes advantage of the threadshare runtime, + * allowing reduced number of threads and context switches when many elements are used. + * + * To report a DTMF event, an application must send an event of type `gst::event::CustomUpstream`, + * having a structure of name "dtmf-event" with fields set according to the following + * table: + * + * * `type` (`G_TYPE_INT` aka `i32`, 0-1): The application uses this field to specify which of + * the two methods specified in RFC 2833 to use. The value should be 0 for tones and 1 for + * named events. Tones are specified by their frequencies and events are specified + * by their number. This element can only take events as input. Do not confuse + * with "method" which specified the output. + * + * * `number` (`G_TYPE_INT`, 0-15): The event number. + * + * * `volume` (`G_TYPE_INT`, 0-36): This field describes the power level of the tone, + * expressed in dBm0 after dropping the sign. Power levels range from 0 to -63 dBm0. The range + * of valid DTMF is from 0 to -36 dBm0. Can be omitted if start is set to FALSE. + * + * * `start` (`G_TYPE_BOOLEAN` aka `bool`, True or False): Whether the event is starting or ending. + * + * * `method` (G_TYPE_INT, 1): The method used for sending event, this element will react if this + * field is absent or 1. + * + * For example, the following code informs the pipeline (and in turn, the + * `ts-rtpdtmfsrc` element inside the pipeline) about the start of an RTP DTMF named + * event '1' of volume -25 dBm0: + * + * |[ + * let dtmf_start_event = gst::event::CustomUpstream::builder( + * gst::Structure::builder("dtmf-event") + * .field("type", 1i32) + * .field("method", 1i32) + * .field("start", true) + * .field("number", 1i32) + * .field("volume", 25i32) + * .build(), + * ) + * .build(); + * + * pipeline.send_event(dtmf_start_event) + * ]| + * + * When a DTMF tone actually starts or stop, a "dtmf-event-processed" + * element #GstMessage with the same fields as the "dtmf-event" + * #GstEvent that was used to request the event. Also, if any event + * has not been processed when the element goes from the PAUSED to the + * READY state, then a "dtmf-event-dropped" message is posted on the + * #GstBus in the order that they were received. + * + * Since: plugins-rs-0.14.0 + */ +use futures::channel::mpsc; +use futures::prelude::*; + +use gst::glib; +use gst::prelude::*; +use gst::subclass::prelude::*; + +use rand::prelude::*; +use rtp_types::RtpPacketBuilder; + +use std::collections::VecDeque; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::{LazyLock, Mutex}; +use std::time::Duration; + +use crate::runtime::prelude::*; +use crate::runtime::{self, timer, PadSrc, Task}; + +static CAT: LazyLock = LazyLock::new(|| { + gst::DebugCategory::new( + "ts-rtpdtmfsrc", + gst::DebugColorFlags::empty(), + Some("Thread-sharing RTP DTMF src"), + ) +}); + +const DEFAULT_CONTEXT: &str = ""; +const DEFAULT_CONTEXT_WAIT: Duration = Duration::ZERO; + +const DEFAULT_CLOCK_RATE: u32 = 8_000; +const DEFAULT_PACKET_REDUNDANCY: u8 = MIN_PACKET_REDUNDANCY; +const DEFAULT_PT: u8 = 96; +const DEFAULT_PTIME: gst::ClockTime = gst::ClockTime::from_mseconds(40); +const DEFAULT_SEQNUM_OFFSET: Option = None; +const DEFAULT_SSRC: Option = None; +const DEFAULT_TIMESTAMP_OFFSET: Option = None; + +const MIN_INTER_DIGIT_INTERVAL: gst::ClockTime = gst::ClockTime::from_mseconds(100); +const MIN_PACKET_REDUNDANCY: u8 = 1; +const MAX_PACKET_REDUNDANCY: u8 = 5; + +const DEFAULT_DTMF_EVT_CHAN_CAPACITY: usize = 4; + +static DEFAULT_CAPS: LazyLock = LazyLock::new(|| { + gst::Caps::builder("application/x-rtp") + .field("media", "audio") + .field("payload", gst::IntRange::new(96, 127)) + .field("clock-rate", gst::IntRange::new(0, i32::MAX)) + .field("encoding-name", "TELEPHONE-EVENT") + .build() +}); + +#[derive(Debug, Clone)] +struct Settings { + context: String, + context_wait: Duration, + timestamp: u32, + seqnum: u32, + timestamp_offset: Option, + seqnum_offset: Option, + clock_rate: u32, + ssrc: Option, + pt: u8, + packet_redundancy: u8, +} + +impl Default for Settings { + fn default() -> Self { + Settings { + context: DEFAULT_CONTEXT.into(), + context_wait: DEFAULT_CONTEXT_WAIT, + timestamp: 0, + seqnum: 0, + timestamp_offset: DEFAULT_TIMESTAMP_OFFSET, + seqnum_offset: DEFAULT_SEQNUM_OFFSET, + clock_rate: DEFAULT_CLOCK_RATE, + ssrc: DEFAULT_SSRC, + pt: DEFAULT_PT, + packet_redundancy: DEFAULT_PACKET_REDUNDANCY, + } + } +} + +#[derive(Debug)] +struct RTPDTMFSrcTask { + elem: super::RTPDTMFSrc, + dtmf_evt_rx: mpsc::Receiver, + stream_start_pending: bool, + segment_pending: bool, + + ptime: gst::ClockTime, + + last_stop: Option, + timestamp: gst::ClockTime, + dtmf_payload: Option, + + next_wake_up: Option, + rtp_ts_offset: u32, + rtp_ts: u32, + + seqnum: u16, + ssrc: u32, + + pt: u8, + clock_rate: u32, + + packet_redundancy: u8, + pending_msg: VecDeque, +} + +impl RTPDTMFSrcTask { + fn new(elem: super::RTPDTMFSrc, dtmf_evt_rx: mpsc::Receiver) -> Self { + let imp = elem.imp(); + + let ptime = *imp.ptime.lock().unwrap(); + + let params = imp.get_configured_or_random_params(); + + let settings = imp.settings.lock().unwrap(); + let clock_rate = settings.clock_rate; + let pt = settings.pt; + let packet_redundancy = settings.packet_redundancy; + drop(settings); + + RTPDTMFSrcTask { + elem, + dtmf_evt_rx, + stream_start_pending: true, + segment_pending: true, + + last_stop: None, + timestamp: gst::ClockTime::ZERO, + dtmf_payload: None, + + next_wake_up: None, + rtp_ts_offset: params.rtp_ts_offset, + rtp_ts: 0, + + seqnum: params.seqnum_offset, + ptime, + + clock_rate, + ssrc: params.ssrc, + pt, + + packet_redundancy, + pending_msg: VecDeque::new(), + } + } +} + +#[derive(Debug, Copy, Clone)] +enum PacketPlace { + First, + Intermediate, + Last, +} + +#[derive(Debug)] +struct DTMFPayload { + event_nb: u8, + volume: u8, + duration: u16, + place: PacketPlace, + redundancy_count: u8, +} + +impl DTMFPayload { + fn is_last(&self) -> bool { + matches!(self.place, PacketPlace::Last) + } +} + +#[derive(Clone, Debug)] +enum DTMFEventStatus { + Processed, + Dropped, +} + +/// DTMF event +impl RTPDTMFSrcTask { + fn start_dtmf_payload(&mut self, event_nb: u8, volume: u8) { + assert!(self.dtmf_payload.is_none()); + + let start_timestamp = self.last_stop.unwrap_or_else(|| { + self.elem + .current_running_time() + .expect("element in Playing state") + }); + + self.timestamp = gst::ClockTime::max(self.timestamp, start_timestamp); + + self.dtmf_payload = Some(DTMFPayload { + event_nb, + volume, + duration: (self.ptime.nseconds()) + .mul_div_floor(self.clock_rate as u64, *gst::ClockTime::SECOND) + .unwrap() as u16, + place: PacketPlace::First, + redundancy_count: self.packet_redundancy, + }); + + self.rtp_ts = self.rtp_ts_offset.wrapping_add( + start_timestamp + .nseconds() + .mul_div_floor(self.clock_rate as u64, *gst::ClockTime::SECOND) + .unwrap() as u32, + ); + } + + fn create_next_rtp_packet(&mut self) -> Option { + let dtmf_pay = self.dtmf_payload.as_mut()?; + + let (is_first, end_mask) = match dtmf_pay.place { + PacketPlace::First => { + dtmf_pay.place = PacketPlace::Intermediate; + (true, 0x00) + } + PacketPlace::Intermediate => (false, 0x00), + PacketPlace::Last => (false, 1 << 7), + }; + + let mut rtp_packet = vec![0u8; 16]; + RtpPacketBuilder::new() + .marker_bit(is_first) + .payload_type(self.pt) + .sequence_number(self.seqnum) + .timestamp(self.rtp_ts) + .ssrc(self.ssrc) + .payload( + // See RFC2833 § 3.5 + [ + dtmf_pay.event_nb, + end_mask | dtmf_pay.volume, + (dtmf_pay.duration >> 8) as u8, + (dtmf_pay.duration & 0xff) as u8, + ] + .as_ref(), + ) + .write_into(&mut rtp_packet) + .unwrap_or_else(|err| { + panic!("Failed to write RTP packet for {dtmf_pay:?}: {err}"); + }); + + let mut rtp_buffer = gst::Buffer::from_mut_slice(rtp_packet); + { + let rtp_buffer = rtp_buffer.get_mut().unwrap(); + + rtp_buffer.set_pts(self.timestamp); + + let duration = if dtmf_pay.redundancy_count > 1 { + gst::ClockTime::ZERO + } else if dtmf_pay.is_last() { + let inter_digit_remainder = MIN_INTER_DIGIT_INTERVAL % self.ptime; + if inter_digit_remainder.is_zero() { + self.ptime + } else { + self.ptime + MIN_INTER_DIGIT_INTERVAL + self.ptime - inter_digit_remainder + } + } else { + self.ptime + }; + + rtp_buffer.set_duration(duration); + + gst::log!( + CAT, + obj = self.elem, + "Created buffer with DTMF event {} duration {duration} pts {} RTP ts {}", + dtmf_pay.event_nb, + self.timestamp, + self.rtp_ts, + ); + + // Duration of DTMF payload for the NEXT packet + // not updated for redundant packets. + if dtmf_pay.redundancy_count <= 1 { + dtmf_pay.duration += (self.ptime.nseconds()) + .mul_div_floor(self.clock_rate as u64, *gst::ClockTime::SECOND) + .unwrap() as u16; + } + + dtmf_pay.redundancy_count = dtmf_pay.redundancy_count.saturating_sub(1); + if dtmf_pay.is_last() && dtmf_pay.redundancy_count == 0 { + self.dtmf_payload = None; + } else { + self.timestamp.opt_add_assign(duration); + } + } + + self.seqnum = self.seqnum.wrapping_add(1); + + Some(rtp_buffer) + } + + fn prepare_message(&self, dtmf_evt: &DTMFEvent, evt_status: DTMFEventStatus) -> gst::Message { + use DTMFEventStatus::*; + let struct_builder = gst::Structure::builder(match evt_status { + Processed => "dtmf-event-processed", + Dropped => "dtmf-event-dropped", + }) + .field("type", 1i32) + .field("method", 1i32); + + use DTMFEvent::*; + gst::message::Element::builder(match *dtmf_evt { + Start { number, volume, .. } => struct_builder + .field("start", true) + .field("number", number as i32) + .field("volume", volume as i32) + .build(), + Stop { .. } => struct_builder.field("start", false).build(), + }) + .src(&self.elem) + .build() + } + + fn dtmf_evt_to_item(&self, dtmf_evt: Option) -> Result { + let Some(dtmf_evt) = dtmf_evt else { + gst::error!(CAT, obj = self.elem, "DTMF event channel is broken"); + gst::element_error!( + self.elem, + gst::CoreError::Failed, + ["DTMF event Queue is broken"] + ); + return Err(gst::FlowError::Error); + }; + + Ok(TaskItem::Event(dtmf_evt)) + } +} + +#[derive(Debug)] +enum TaskItem { + Event(DTMFEvent), + Timer, +} + +impl TaskImpl for RTPDTMFSrcTask { + type Item = TaskItem; + + async fn start(&mut self) -> Result<(), gst::ErrorMessage> { + gst::log!(CAT, obj = self.elem, "Starting Task"); + + if self.stream_start_pending { + gst::debug!(CAT, obj = self.elem, "Pushing initial events"); + + let stream_id = format!("{:08x}{:08x}", rand::random::(), rand::random::()); + let stream_start = gst::event::StreamStart::builder(&stream_id) + .group_id(gst::GroupId::next()) + .build(); + self.elem.imp().srcpad.push_event(stream_start).await; + self.stream_start_pending = false; + } + + self.negotiate().map_err(|err| { + gst::error_msg!( + gst::CoreError::Negotiation, + ["Caps negotiation failed: {err}"] + ) + })?; + + if self.segment_pending { + let segment = gst::FormattedSegment::::new(); + self.elem + .imp() + .srcpad + .push_event(gst::event::Segment::new(&segment)) + .await; + + self.segment_pending = false; + } + + Ok(()) + } + + async fn stop(&mut self) -> Result<(), gst::ErrorMessage> { + gst::log!(CAT, obj = self.elem, "Stopping Task"); + + self.flush().await; + self.stream_start_pending = true; + self.segment_pending = true; + + Ok(()) + } + + async fn flush_start(&mut self) -> Result<(), gst::ErrorMessage> { + gst::log!(CAT, obj = self.elem, "Starting task flush"); + + self.flush().await; + self.segment_pending = true; + + gst::log!(CAT, obj = self.elem, "Task flush started"); + Ok(()) + } + + async fn try_next(&mut self) -> Result { + Ok(if let Some(ref mut next_wake_up) = self.next_wake_up { + futures::select! { + _ = next_wake_up.fuse() => TaskItem::Timer, + dtmf_evt = self.dtmf_evt_rx.next() => { + self.dtmf_evt_to_item(dtmf_evt)? + } + } + } else { + let dtmf_evt = self.dtmf_evt_rx.next().await; + self.dtmf_evt_to_item(dtmf_evt)? + }) + } + + async fn handle_item(&mut self, item: TaskItem) -> Result<(), gst::FlowError> { + gst::debug!(CAT, obj = self.elem, "Handling {item:?}"); + + use DTMFEvent::*; + use DTMFEventStatus::*; + match item { + TaskItem::Event(dtmf_evt) => match dtmf_evt { + Start { + number, + volume, + last_stop, + } => { + self.last_stop = last_stop; + + if self.dtmf_payload.is_some() { + gst::warning!( + CAT, + obj = self.elem, + "Received two consecutive DTMF start events" + ); + self.elem + .post_message(self.prepare_message(&dtmf_evt, Dropped)) + .expect("element in Playing state"); + return Ok(()); + }; + + self.start_dtmf_payload(number, volume); + self.pending_msg + .push_back(self.prepare_message(&dtmf_evt, Processed)); + } + Stop { last_stop } => { + self.last_stop = last_stop; + + let Some(dtmf_payload) = self.dtmf_payload.as_mut() else { + gst::warning!( + CAT, + obj = self.elem, + "Received a DTMF stop event while already stopped" + ); + self.elem + .post_message(self.prepare_message(&dtmf_evt, Dropped)) + .expect("element in Playing state"); + return Ok(()); + }; + + dtmf_payload.place = PacketPlace::Last; + dtmf_payload.redundancy_count = self.packet_redundancy; + self.pending_msg + .push_back(self.prepare_message(&dtmf_evt, Processed)); + } + }, + TaskItem::Timer => { + self.next_wake_up = None; + self.push_next_packet().await?; + } + } + + self.next_wake_up = self.dtmf_payload.as_ref().map(|_| { + let now = self + .elem + .current_running_time() + .expect("element in Playing state"); + + gst::log!( + CAT, + obj = self.elem, + "Setting next wake up for timestamp {}, now {now}", + self.timestamp + ); + + timer::delay_for(Duration::from_nanos(*(self.timestamp.saturating_sub(now)))) + }); + + Ok(()) + } +} + +impl RTPDTMFSrcTask { + async fn push_next_packet(&mut self) -> Result<(), gst::FlowError> { + while let Some(msg) = self.pending_msg.pop_front() { + self.elem + .post_message(msg) + .expect("element in Playing state"); + } + + if let Some(rtp_buffer) = self.create_next_rtp_packet() { + gst::debug!(CAT, obj = self.elem, "Pushing RTP packet {rtp_buffer:?}"); + self.elem.imp().srcpad.push(rtp_buffer).await?; + } + + gst::log!(CAT, obj = self.elem, "Pushed RTP packet"); + + Ok(()) + } + + async fn flush(&mut self) { + if let Some(ref mut dtmf_payload) = self.dtmf_payload { + dtmf_payload.place = PacketPlace::Last; + dtmf_payload.redundancy_count = self.packet_redundancy; + + let _ = self.push_next_packet().await; + + while let Ok(Some(dtmf_evt)) = self.dtmf_evt_rx.try_next() { + let _ = self + .elem + .post_message(self.prepare_message(&dtmf_evt, DTMFEventStatus::Dropped)); + } + } + + let imp = self.elem.imp(); + + imp.last_event_was_start.store(false, Ordering::SeqCst); + + self.next_wake_up = None; + self.last_stop = None; + self.timestamp = gst::ClockTime::ZERO; + + let params = imp.get_configured_or_random_params(); + self.rtp_ts_offset = params.rtp_ts_offset; + self.rtp_ts = params.rtp_ts_offset; + self.seqnum = params.seqnum_offset; + self.ssrc = params.ssrc; + } +} + +#[derive(thiserror::Error, Debug, PartialEq, Eq)] +enum CapsNegotiationError { + #[error("Could not intersect with peer caps {}", .0)] + CapsIntersection(gst::Caps), + + #[error("Peer's '{field}' with value {value} is out of range")] + OutOfRange { field: String, value: i64 }, + + #[error("Failed to fixate '{field}' with {value} in {caps}")] + Fixate { + field: String, + value: i64, + caps: gst::Caps, + }, +} + +/// Caps negotiation +impl RTPDTMFSrcTask { + fn negotiate(&mut self) -> Result<(), CapsNegotiationError> { + let imp = self.elem.imp(); + let pad = imp.srcpad.gst_pad(); + + let src_tmpl_caps = pad.pad_template_caps(); + let peercaps = pad.peer_query_caps(Some(&src_tmpl_caps)); + gst::log!(CAT, imp = imp, "Peer returned {peercaps:?}"); + + let newcaps = if peercaps.is_empty() { + let srccaps = gst::Caps::builder("application/x-rtp") + .field("media", "audio") + .field("payload", self.pt) + .field("ssrc", self.ssrc) + .field("timestamp-offset", self.rtp_ts_offset) + .field("clock-rate", self.clock_rate) + .field("seqnum-offset", self.seqnum as u32) + .field("encoding-name", "TELEPHONE-EVENT") + .build(); + + gst::debug!(CAT, obj = self.elem, "No peer caps, using {srccaps}"); + srccaps + } else { + let mut inter = + peercaps.intersect_with_mode(&src_tmpl_caps, gst::CapsIntersectMode::First); + if inter.is_empty() { + return Err(CapsNegotiationError::CapsIntersection(peercaps)); + } + + { + let inter = inter.make_mut(); + let s = inter.structure_mut(0).expect("not empty"); + + match s.get_optional::("payload") { + Ok(Some(pt)) => { + let pt = pt + .try_into() + .map_err(|_| CapsNegotiationError::OutOfRange { + field: "pt".to_string(), + value: pt as i64, + })?; + + gst::log!(CAT, imp = imp, "Using peer pt {pt}"); + self.pt = pt; + } + Ok(None) => { + s.set("payload", self.pt as i32); + gst::log!(CAT, imp = imp, "Using internal pt {}", self.pt); + } + Err(_) => { + if s.fixate_field_nearest_int("payload", self.pt as i32) { + self.pt = s.get::("payload").unwrap() as u8; + + gst::log!(CAT, imp = imp, "Using fixated pt {}", self.pt); + } else { + return Err(CapsNegotiationError::Fixate { + field: "payload".to_string(), + value: self.pt as i64, + caps: peercaps, + }); + } + } + } + + match s.get_optional::("clock-rate") { + Ok(Some(clock_rate)) => { + let clock_rate = clock_rate.try_into().map_err(|_| { + CapsNegotiationError::OutOfRange { + field: "clock-rate".to_string(), + value: clock_rate as i64, + } + })?; + + gst::log!(CAT, imp = imp, "Using peer clock-rate {clock_rate}"); + self.clock_rate = clock_rate; + } + Ok(None) => { + s.set("clock-rate", self.clock_rate as i32); + gst::log!( + CAT, + imp = imp, + "Using internal clock-rate {}", + self.clock_rate + ); + } + Err(_) => { + if s.fixate_field_nearest_int("clock-rate", self.clock_rate as i32) { + self.clock_rate = s.get::("clock-rate").unwrap() as u32; + gst::log!( + CAT, + imp = imp, + "Using fixated clock-rate {}", + self.clock_rate + ); + } else { + return Err(CapsNegotiationError::Fixate { + field: "clock-rate".to_string(), + value: self.clock_rate as i64, + caps: peercaps, + }); + } + } + } + + match s.get_optional::("ssrc") { + Ok(Some(ssrc)) => { + gst::log!(CAT, imp = imp, "Using peer ssrc {ssrc:#08x}"); + self.ssrc = ssrc; + } + other => { + if let Err(err) = other { + gst::warning!( + CAT, + imp = imp, + "Invalid type for peer 'ssrc' in {s}: {err}" + ); + } + s.set("ssrc", self.ssrc); + gst::log!(CAT, imp = imp, "Using internal ssrc {}", self.ssrc); + } + } + + match s.get_optional::("timestamp-offset") { + Ok(Some(timestamp_offset)) => { + gst::log!( + CAT, + imp = imp, + "Using peer timestamp-offset {timestamp_offset}" + ); + self.rtp_ts_offset = timestamp_offset; + } + other => { + if let Err(err) = other { + // Would be cool to be able to fixate uint + gst::warning!( + CAT, + imp = imp, + "Invalid type for peer 'timestamp-offset' in {s}: {err}" + ); + } + s.set("timestamp-offset", self.rtp_ts_offset); + gst::log!( + CAT, + imp = imp, + "Using internal timestamp-offset {}", + self.rtp_ts_offset + ); + } + } + + match s.get_optional::("seqnum-offset") { + Ok(Some(seqnum_offset)) => { + let seqnum_offset = seqnum_offset.try_into().map_err(|_| { + CapsNegotiationError::OutOfRange { + field: "seqnum-offset".to_string(), + value: seqnum_offset as i64, + } + })?; + + gst::log!(CAT, imp = imp, "Using peer seqnum-offset {seqnum_offset}"); + self.seqnum = seqnum_offset; + } + other => { + if let Err(err) = other { + // Would be cool to be able to fixate uint + gst::warning!( + CAT, + imp = imp, + "Invalid type for peer 'seqnum-offset' in {s}: {err}" + ); + } + s.set("seqnum-offset", self.seqnum as u32); + gst::log!( + CAT, + imp = imp, + "Using internal seqnum-offset {}", + self.seqnum + ); + } + } + + if let Ok(Some(ptime)) = s.get_optional::("ptime") { + gst::log!(CAT, imp = imp, "Using peer ptime {ptime}"); + self.ptime = gst::ClockTime::from_mseconds(ptime as u64); + *imp.ptime.lock().unwrap() = self.ptime; + } else { + match s.get_optional::("maxptime") { + Ok(Some(maxptime)) => { + gst::log!(CAT, imp = imp, "Using peer maxptime {maxptime}"); + self.ptime = gst::ClockTime::from_mseconds(maxptime as u64); + *imp.ptime.lock().unwrap() = self.ptime; + } + other => { + if let Err(err) = other { + // Would be cool to be able to fixate uint + gst::warning!( + CAT, + imp = imp, + "Invalid type for peer 'ptime' / 'maxptime' in {s}: {err}" + ); + s.remove_field("maxptime"); + } + s.set("ptime", self.ptime); + gst::log!(CAT, imp = imp, "Using internal ptime {}", self.ptime); + } + } + } + } + + gst::debug!(CAT, obj = self.elem, "Processed peer caps => {inter}"); + inter + }; + + pad.push_event(gst::event::Caps::new(&newcaps)); + + Ok(()) + } +} + +#[derive(thiserror::Error, Debug, PartialEq, Eq)] +enum DTMFEventError { + #[error("Not a DTMF event")] + NotDTMFEvent, + + #[error("Unsupported DTMF event type {}", .0)] + UnsupportedType(i32), + + #[error("Unsupported DTMF event method {}", .0)] + UnsupportedMethod(i32), + + #[error("Field {field}: {err}")] + FieldError { field: String, err: String }, +} + +#[derive(Clone, Debug)] +enum DTMFEvent { + Start { + number: u8, + volume: u8, + last_stop: Option, + }, + Stop { + last_stop: Option, + }, +} + +impl DTMFEvent { + fn try_parse(event: &gst::event::CustomUpstream) -> Result { + use DTMFEventError::*; + + let Some(s) = event.structure() else { + return Err(NotDTMFEvent); + }; + if s.name() != "dtmf-event" { + return Err(NotDTMFEvent); + } + + let evt_type = s.get::("type").map_err(|err| FieldError { + field: "type".to_string(), + err: err.to_string(), + })?; + if evt_type != 1i32 { + return Err(UnsupportedType(evt_type)); + } + + let start = s.get::("start").map_err(|err| FieldError { + field: "start".to_string(), + err: err.to_string(), + })?; + + let method = s.get_optional::("method").map_err(|err| FieldError { + field: "method".to_string(), + err: err.to_string(), + })?; + if method.is_some_and(|method| method != 1i32) { + return Err(UnsupportedMethod(method.expect("checked above"))); + } + + let last_stop = s + .get_optional::("last-stop") + .map_err(|err| FieldError { + field: "last-stop".to_string(), + err: err.to_string(), + })?; + + let dtmf_evt = if start { + let number = s.get::("number").map_err(|err| FieldError { + field: "number".to_string(), + err: err.to_string(), + })?; + if !(0..=15).contains(&number) { + return Err(FieldError { + field: "number".to_string(), + err: format!("{number} is out of range [0, 15]"), + }); + } + + let volume = s.get::("volume").map_err(|err| FieldError { + field: "volume".to_string(), + err: err.to_string(), + })?; + if !(0..=36).contains(&volume) { + return Err(FieldError { + field: "volume".to_string(), + err: format!("{volume} is out of range [0, 36]"), + }); + } + + DTMFEvent::Start { + number: number as u8, + volume: volume as u8, + last_stop, + } + } else { + DTMFEvent::Stop { last_stop } + }; + + Ok(dtmf_evt) + } + + fn is_start(&self) -> bool { + matches!(*self, DTMFEvent::Start { .. }) + } +} + +#[derive(Debug)] +pub struct ConfiguredOrRandomParams { + rtp_ts_offset: u32, + seqnum_offset: u16, + ssrc: u32, +} + +#[derive(Debug)] +pub struct RTPDTMFSrc { + srcpad: PadSrc, + task: Task, + settings: Mutex, + last_event_was_start: AtomicBool, + ptime: Mutex, + dtmf_evt_tx: Mutex>>, +} + +impl RTPDTMFSrc { + /// Handles the DTMF event + /// + /// Returns `true` if it could be handled, `false` otherwise. + fn handle_maybe_dtmf_event(&self, event: &gst::Event) -> bool { + gst::log!(CAT, imp = self, "Handling {event:?}"); + + let gst::EventView::CustomUpstream(evt) = event.view() else { + gst::log!(CAT, imp = self, "Not Handling unknown {event:?}"); + return false; + }; + + let dtmf_evt = match DTMFEvent::try_parse(evt) { + Ok(dtmf_evt) => dtmf_evt, + Err(DTMFEventError::NotDTMFEvent) => { + return false; + } + Err(err) => { + gst::error!( + CAT, + imp = self, + "Failed to parse incoming DTMF event: {err}" + ); + return false; + } + }; + + let is_start = dtmf_evt.is_start(); + if is_start == self.last_event_was_start.load(Ordering::SeqCst) { + gst::error!( + CAT, + imp = self, + "Unexpected {} event", + if is_start { "start" } else { "end" }, + ); + return false; + } + + if let Err(err) = self + .dtmf_evt_tx + .lock() + .unwrap() + .as_mut() + .expect("set in prepare") + .try_send(dtmf_evt) + { + if err.is_full() { + // This probably means the app is spamming us + let dtmf_event = err.into_inner(); + gst::error!( + CAT, + imp = self, + "DTMF event channel is full => dropping {dtmf_event:?}" + ); + } else { + gst::error!(CAT, imp = self, "DTMF event channel is broken"); + gst::element_error!( + self.obj(), + gst::CoreError::Failed, + ["DTMF event Queue is broken"] + ); + } + + return false; + } + + self.last_event_was_start.store(is_start, Ordering::SeqCst); + + true + } + + fn get_configured_or_random_params(&self) -> ConfiguredOrRandomParams { + let mut rng = rand::rng(); + + let settings = self.settings.lock().unwrap(); + + ConfiguredOrRandomParams { + rtp_ts_offset: settings + .timestamp_offset + .unwrap_or_else(|| rng.random::()), + seqnum_offset: settings + .seqnum_offset + .unwrap_or_else(|| rng.random::()), + ssrc: settings.ssrc.unwrap_or_else(|| rng.random::()), + } + } + + fn prepare(&self) -> Result<(), gst::ErrorMessage> { + gst::debug!(CAT, imp = self, "Preparing"); + + let settings = self.settings.lock().unwrap(); + let context = + runtime::Context::acquire(&settings.context, settings.context_wait).map_err(|err| { + gst::error_msg!( + gst::ResourceError::OpenRead, + ["Failed to acquire Context: {}", err] + ) + })?; + drop(settings); + + let (dtmf_evt_tx, dtmf_evt_rx) = mpsc::channel(DEFAULT_DTMF_EVT_CHAN_CAPACITY); + self.task + .prepare( + RTPDTMFSrcTask::new(self.obj().clone(), dtmf_evt_rx), + context, + ) + .block_on()?; + *self.dtmf_evt_tx.lock().unwrap() = Some(dtmf_evt_tx); + + gst::debug!(CAT, imp = self, "Prepared"); + + Ok(()) + } + + fn unprepare(&self) { + gst::debug!(CAT, imp = self, "Unpreparing"); + + self.task.unprepare().block_on().unwrap(); + *self.dtmf_evt_tx.lock().unwrap() = None; + + gst::debug!(CAT, imp = self, "Unprepared"); + } + + fn stop(&self) -> Result<(), gst::ErrorMessage> { + gst::debug!(CAT, imp = self, "Stopping"); + + self.task.stop().block_on()?; + { + let mut settings = self.settings.lock().unwrap(); + settings.timestamp = 0; + settings.seqnum = 0; + } + + gst::debug!(CAT, imp = self, "Stopped"); + + Ok(()) + } + + fn start(&self) -> Result<(), gst::ErrorMessage> { + gst::debug!(CAT, imp = self, "Starting"); + self.task.start().block_on()?; + gst::debug!(CAT, imp = self, "Started"); + + Ok(()) + } + + fn pause(&self) -> Result<(), gst::ErrorMessage> { + gst::debug!(CAT, imp = self, "Pausing"); + self.task.pause().block_on()?; + gst::debug!(CAT, imp = self, "Paused"); + + Ok(()) + } +} + +#[glib::object_subclass] +impl ObjectSubclass for RTPDTMFSrc { + const NAME: &'static str = "GstTsRTPDTMFSrc"; + type Type = super::RTPDTMFSrc; + type ParentType = gst::Element; + + fn with_class(klass: &Self::Class) -> Self { + Self { + srcpad: PadSrc::new( + gst::Pad::from_template(&klass.pad_template("src").unwrap()), + RTPDTMFSrcPadHandler, + ), + task: Task::default(), + settings: Default::default(), + last_event_was_start: AtomicBool::new(false), + ptime: Mutex::new(DEFAULT_PTIME), + dtmf_evt_tx: Mutex::new(None), + } + } +} + +impl ObjectImpl for RTPDTMFSrc { + fn properties() -> &'static [glib::ParamSpec] { + static PROPERTIES: LazyLock> = LazyLock::new(|| { + vec![ + glib::ParamSpecString::builder("context") + .nick("Context") + .blurb("Context name to share threads with") + .default_value(Some(DEFAULT_CONTEXT)) + .readwrite() + .construct_only() + .build(), + glib::ParamSpecUInt::builder("context-wait") + .nick("Context Wait") + .blurb("Throttle poll loop to run at most once every this many ms") + .maximum(1000) + .default_value(DEFAULT_CONTEXT_WAIT.as_millis() as u32) + .readwrite() + .construct_only() + .build(), + glib::ParamSpecUInt::builder("timestamp") + .nick("Timestamp") + .blurb("The RTP timestamp of the last processed packet") + .minimum(0) + .maximum(u32::MAX) + .read_only() + .build(), + glib::ParamSpecUInt::builder("seqnum") + .nick("Sequence number") + .blurb("The RTP Sequence number of the last processed packet") + .minimum(0) + .maximum(u32::MAX) + .read_only() + .build(), + glib::ParamSpecInt::builder("timestamp-offset") + .nick("Timestamp Offset") + .blurb("Offset to add to all outgoing timestamps (-1 = random)") + .minimum(-1) + .maximum(u32::MAX as i32) + .default_value(DEFAULT_TIMESTAMP_OFFSET.map_or(-1i32, |val| val as i32)) + .build(), + glib::ParamSpecInt::builder("seqnum-offset") + .nick("Sequence Number Offset") + .blurb("Offset to add to all outgoing seqnum (-1 => random)") + .minimum(-1) + .maximum(i32::MAX) + .default_value(DEFAULT_SEQNUM_OFFSET.map_or(-1i32, |val| val as i32)) + .build(), + glib::ParamSpecUInt::builder("clock-rate") + .nick("Clock-rate") + .blurb("The clock-rate at which to generate DTMF packets") + .minimum(0) + .maximum(u32::MAX) + .default_value(DEFAULT_CLOCK_RATE) + .build(), + glib::ParamSpecUInt::builder("ssrc") + .nick("Synchronization Source (SSRC)") + .blurb("The SSRC of the packets (-1 => random)") + .minimum(0) + .maximum(u32::MAX) + .default_value(DEFAULT_SSRC.unwrap_or(u32::MAX)) + .build(), + glib::ParamSpecUInt::builder("pt") + .nick("Payload Type") + .blurb("The payload type of the packets") + .minimum(0) + .maximum(0x80) + .default_value(DEFAULT_PT as u32) + .build(), + glib::ParamSpecUInt::builder("packet-redundancy") + .nick("Packet Redundancy") + .blurb("Number of packets to send to indicate start and stop DTMF events") + .minimum(MIN_PACKET_REDUNDANCY as u32) + .maximum(MAX_PACKET_REDUNDANCY as u32) + .default_value(DEFAULT_PACKET_REDUNDANCY as u32) + .build(), + ] + }); + + PROPERTIES.as_ref() + } + + fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { + let mut settings = self.settings.lock().unwrap(); + match pspec.name() { + "context" => { + settings.context = value + .get::>() + .unwrap() + .unwrap_or_else(|| DEFAULT_CONTEXT.into()); + } + "context-wait" => { + settings.context_wait = Duration::from_millis(value.get::().unwrap().into()); + } + "timestamp-offset" => { + settings.timestamp_offset = value.get::().unwrap().try_into().ok(); + } + "seqnum-offset" => { + settings.seqnum_offset = value.get::().unwrap().try_into().ok(); + } + "clock-rate" => { + settings.clock_rate = value.get::().unwrap(); + } + "ssrc" => { + settings.ssrc = value.get::().unwrap().try_into().ok(); + } + "pt" => { + settings.pt = value.get::().unwrap().try_into().unwrap(); + } + "packet-redundancy" => { + settings.packet_redundancy = value.get::().unwrap().try_into().unwrap(); + } + _ => unimplemented!(), + } + } + + fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { + let settings = self.settings.lock().unwrap(); + match pspec.name() { + "context" => settings.context.to_value(), + "context-wait" => (settings.context_wait.as_millis() as u32).to_value(), + "timestamp" => settings.timestamp.to_value(), + "seqnum" => settings.seqnum.to_value(), + "timestamp-offset" => settings + .timestamp_offset + .and_then(|val| val.try_into().ok()) + .unwrap_or(-1i32) + .to_value(), + "seqnum-offset" => settings + .seqnum_offset + .map_or(-1i32, |val| val as i32) + .to_value(), + "clock-rate" => settings.clock_rate.to_value(), + "ssrc" => settings + .ssrc + .and_then(|val| val.try_into().ok()) + .unwrap_or(-1i32) + .to_value(), + "pt" => (settings.pt as u32).to_value(), + "packet-redundancy" => (settings.packet_redundancy as u32).to_value(), + _ => unimplemented!(), + } + } + + fn constructed(&self) { + self.parent_constructed(); + + let obj = self.obj(); + obj.add_pad(self.srcpad.gst_pad()).unwrap(); + obj.set_element_flags(gst::ElementFlags::SOURCE); + } +} + +impl GstObjectImpl for RTPDTMFSrc {} + +impl ElementImpl for RTPDTMFSrc { + fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { + static ELEMENT_METADATA: LazyLock = LazyLock::new(|| { + gst::subclass::ElementMetadata::new( + "Thread-sharing RTP DTMF source", + "Source/Network/RTP", + "Thread-sharing RTP DTMF packet (RFC2833) source", + "François Laignel ", + ) + }); + + Some(&*ELEMENT_METADATA) + } + + fn send_event(&self, event: gst::Event) -> bool { + gst::log!(CAT, imp = self, "Got {event:?}"); + + if self.handle_maybe_dtmf_event(&event) { + true + } else { + self.parent_send_event(event) + } + } + + fn pad_templates() -> &'static [gst::PadTemplate] { + static PAD_TEMPLATES: LazyLock> = LazyLock::new(|| { + let src_pad_template = gst::PadTemplate::new( + "src", + gst::PadDirection::Src, + gst::PadPresence::Always, + &DEFAULT_CAPS, + ) + .unwrap(); + + vec![src_pad_template] + }); + + PAD_TEMPLATES.as_ref() + } + + fn change_state( + &self, + transition: gst::StateChange, + ) -> Result { + gst::trace!(CAT, imp = self, "Changing state {transition:?}"); + + match transition { + gst::StateChange::NullToReady => { + self.prepare().map_err(|err| { + self.post_error_message(err); + gst::StateChangeError + })?; + } + gst::StateChange::PlayingToPaused => { + self.pause().map_err(|_| gst::StateChangeError)?; + } + gst::StateChange::ReadyToNull => { + self.unprepare(); + } + _ => (), + } + + let mut success = self.parent_change_state(transition)?; + + match transition { + gst::StateChange::ReadyToPaused => { + self.pause().map_err(|_| gst::StateChangeError)?; + success = gst::StateChangeSuccess::NoPreroll; + } + gst::StateChange::PausedToPlaying => { + self.start().map_err(|_| gst::StateChangeError)?; + } + gst::StateChange::PlayingToPaused => { + success = gst::StateChangeSuccess::NoPreroll; + } + gst::StateChange::PausedToReady => { + self.stop().map_err(|_| gst::StateChangeError)?; + } + _ => (), + } + + Ok(success) + } +} + +#[derive(Clone, Debug)] +struct RTPDTMFSrcPadHandler; + +impl PadSrcHandler for RTPDTMFSrcPadHandler { + type ElementImpl = RTPDTMFSrc; + + fn src_event(self, pad: &gst::Pad, imp: &Self::ElementImpl, event: gst::Event) -> bool { + gst::log!(CAT, obj = pad, "Handling {event:?}"); + + use gst::EventView::*; + match event.view() { + FlushStart(..) => imp.task.flush_start().await_maybe_on_context().is_ok(), + FlushStop(..) => imp.task.flush_stop().await_maybe_on_context().is_ok(), + Reconfigure(..) => true, + Latency(..) => true, + _ => imp.handle_maybe_dtmf_event(&event), + } + } + + fn src_query(self, pad: &gst::Pad, imp: &Self::ElementImpl, query: &mut gst::QueryRef) -> bool { + gst::log!(CAT, obj = pad, "Received {query:?}"); + + if let gst::QueryViewMut::Latency(q) = query.view_mut() { + let latency = *imp.ptime.lock().unwrap() + // timers can be up to 1/2 x context-wait late + + gst::ClockTime::from_nseconds( + imp.settings.lock().unwrap().context_wait.as_nanos() as u64, + ) / 2; + + gst::debug!(CAT, imp = imp, "Reporting latency of {latency}"); + q.set(true, latency, gst::ClockTime::NONE); + + return true; + } + + gst::Pad::query_default(pad, Some(&*imp.obj()), query) + } +} diff --git a/generic/threadshare/src/rtpdtmfsrc/mod.rs b/generic/threadshare/src/rtpdtmfsrc/mod.rs new file mode 100644 index 000000000..e1a8084d0 --- /dev/null +++ b/generic/threadshare/src/rtpdtmfsrc/mod.rs @@ -0,0 +1,17 @@ +use gst::glib; +use gst::prelude::*; + +mod imp; + +glib::wrapper! { + pub struct RTPDTMFSrc(ObjectSubclass) @extends gst::Element, gst::Object; +} + +pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + gst::Element::register( + Some(plugin), + "ts-rtpdtmfsrc", + gst::Rank::NONE, + RTPDTMFSrc::static_type(), + ) +} diff --git a/generic/threadshare/tests/rtpdtmfsrc.rs b/generic/threadshare/tests/rtpdtmfsrc.rs new file mode 100644 index 000000000..9687a560b --- /dev/null +++ b/generic/threadshare/tests/rtpdtmfsrc.rs @@ -0,0 +1,413 @@ +// use futures::channel::oneshot; +use futures::prelude::*; +use gst::prelude::*; + +use std::sync::{Arc, Mutex}; +use std::time::Duration; + +use gstthreadshare::runtime; + +fn init() { + use std::sync::Once; + static INIT: Once = Once::new(); + + INIT.call_once(|| { + gst::init().unwrap(); + gstthreadshare::plugin_register_static().expect("gstthreadshare inter test"); + }); +} + +#[test] +fn nominal() { + init(); + + let pipe = gst::Pipeline::with_name("rtpdtmfsrc::nominal"); + // let src = gst::ElementFactory::make("rtpdtmfsrc") + let src = gst::ElementFactory::make("ts-rtpdtmfsrc") + .property("context", "rtpdtmfsrc::nominal") + .property("context-wait", 20u32) + .name("src::rtpdtmfsrc::nominal") + .build() + .unwrap(); + let mux = gst::ElementFactory::make("rtpdtmfmux").build().unwrap(); + let appsink = gst_app::AppSink::builder() + .name("appsink::rtpdtmfsrc::nominal") + .caps( + &gst::Caps::builder("application/x-rtp") + .field("media", "audio") + .field("clock-rate", gst::List::new([8_000, 16_000])) + .field("encoding-name", "TELEPHONE-EVENT") + .build(), + ) + .build(); + + let elems = [&src, &mux, appsink.upcast_ref::()]; + pipe.add_many(elems).unwrap(); + gst::Element::link_many(elems).unwrap(); + + // Steps + const INIT: u32 = 0; + const SEND_EARLY_END: u32 = 1; + const SEND_START_4: u32 = 2; + const START_4_SENT: u32 = 3; + const START_4_MSG_RECEIVED: u32 = 4; + const START_4_BUFFER_RECEIVED: u32 = 5; + const START_4_MSG_AND_BUFFER_RECEIVED: u32 = 6; + // time interval before end + const SEND_4_END: u32 = 9; + const END_4_SENT: u32 = 10; + const END_4_MSG_RECEIVED: u32 = 11; + const END_4_BUFFER_RECEIVED: u32 = 12; + const END_4_MSG_AND_BUFFER_RECEIVED: u32 = 13; + + const SEND_START_2: u32 = 20; + const START_2_SENT: u32 = 21; + const START_2_MSG_RECEIVED: u32 = 22; + const START_2_BUFFER_RECEIVED: u32 = 23; + const START_2_MSG_AND_BUFFER_RECEIVED: u32 = 24; + // time interval before end + const SEND_2_END: u32 = 27; + const END_2_SENT: u32 = 28; + const END_2_MSG_RECEIVED: u32 = 29; + const END_2_BUFFER_RECEIVED: u32 = 30; + const END_2_MSG_AND_BUFFER_RECEIVED: u32 = 31; + + const VALID_EVENT_4_DIGIT: u8 = 4; + const VALID_EVENT_4_VOLUME: u8 = 12; + const VALID_EVENT_2_DIGIT: u8 = 2; + const VALID_EVENT_2_VOLUME: u8 = 18; + + const PAYLOAD_DIGIT_BYTE_INDEX: usize = 12; + const PAYLOAD_END_VOLUME_BYTE_INDEX: usize = 13; + const PAYLOAD_END_MASK: u8 = 0x80; + const PAYLOAD_DURATION_BIG_BYTE_INDEX: usize = 14; + const PAYLOAD_DURATION_SMALL_BYTE_INDEX: usize = 15; + const PAYLOAD_DURATION_INCREMENT: u16 = 40 * 8; // 40ms @ 8kHZ + + const BUFFER_DURATION: gst::ClockTime = gst::ClockTime::from_mseconds(40); + const BUFFER_DURATION_END: gst::ClockTime = gst::ClockTime::from_mseconds(40 * 4); + + let step = Arc::new(Mutex::new(INIT)); + appsink.set_callbacks( + gst_app::AppSinkCallbacks::builder() + .new_sample({ + let step = step.clone(); + let mut buffer_idx = 0; + move |appsink| { + let sample = appsink.pull_sample().unwrap(); + let buffer = sample.buffer().unwrap(); + let buf = buffer.map_readable().unwrap(); + let duration = (buf[PAYLOAD_DURATION_BIG_BYTE_INDEX] as u16) << 8 + | buf[PAYLOAD_DURATION_SMALL_BYTE_INDEX] as u16; + let mut step = step.lock().unwrap(); + + let details = |buffer_idx| { + format!( + "idx {buffer_idx} ts {} @ step {step:02} DTMF payload: {:02x} {:02x} {:02x} {:02x}", + buffer.pts().display(), + buf[PAYLOAD_DIGIT_BYTE_INDEX], + buf[PAYLOAD_END_VOLUME_BYTE_INDEX], + buf[PAYLOAD_DURATION_BIG_BYTE_INDEX], + buf[PAYLOAD_DURATION_SMALL_BYTE_INDEX], + ) + }; + + match *step { + START_4_SENT | START_4_MSG_RECEIVED => { + assert_eq!(buf[PAYLOAD_DIGIT_BYTE_INDEX], VALID_EVENT_4_DIGIT); + assert_eq!(buf[PAYLOAD_END_VOLUME_BYTE_INDEX], VALID_EVENT_4_VOLUME); + assert_eq!(buffer.duration().unwrap(), BUFFER_DURATION); + assert_eq!(duration, (buffer_idx + 1) * PAYLOAD_DURATION_INCREMENT); + println!( + "rtpdtmfsrc::nominal start buffer received {}", + details(buffer_idx) + ); + + buffer_idx += 1; + *step = if *step == START_4_SENT { + START_4_BUFFER_RECEIVED + } else { + START_4_MSG_AND_BUFFER_RECEIVED + }; + } + START_4_BUFFER_RECEIVED..END_4_SENT => { + assert_eq!(buf[PAYLOAD_DIGIT_BYTE_INDEX], VALID_EVENT_4_DIGIT); + assert_eq!(buf[PAYLOAD_END_VOLUME_BYTE_INDEX], VALID_EVENT_4_VOLUME); + assert_eq!(buffer.duration().unwrap(), BUFFER_DURATION); + assert_eq!(duration, (buffer_idx + 1) * PAYLOAD_DURATION_INCREMENT); + println!( + "rtpdtmfsrc::nominal intermediate buffer received {}", + details(buffer_idx) + ); + + buffer_idx += 1; + } + END_4_SENT | END_4_MSG_RECEIVED => { + if buf[PAYLOAD_END_VOLUME_BYTE_INDEX] & PAYLOAD_END_MASK != 0 { + println!( + "rtpdtmfsrc::nominal end buffer received {}", + details(buffer_idx) + ); + assert_eq!(buffer.duration().unwrap(), BUFFER_DURATION_END); + assert_eq!(duration, (buffer_idx + 1) * PAYLOAD_DURATION_INCREMENT); + + buffer_idx += 1; + *step = if *step == END_4_SENT { + END_4_BUFFER_RECEIVED + } else { + END_4_MSG_AND_BUFFER_RECEIVED + }; + } else { + println!( + "rtpdtmfsrc::nominal late intermediate buffer received {}", + details(buffer_idx) + ); + assert_eq!(buffer.duration().unwrap(), BUFFER_DURATION); + assert_eq!(duration, (buffer_idx + 1) * PAYLOAD_DURATION_INCREMENT); + + buffer_idx += 1; + } + } + START_2_SENT | START_2_MSG_RECEIVED => { + // New digit + buffer_idx = 0; + + assert_eq!(buf[PAYLOAD_DIGIT_BYTE_INDEX], VALID_EVENT_2_DIGIT); + assert_eq!(buf[PAYLOAD_END_VOLUME_BYTE_INDEX], VALID_EVENT_2_VOLUME); + assert_eq!(buffer.duration().unwrap(), BUFFER_DURATION); + assert_eq!(duration, (buffer_idx + 1) * PAYLOAD_DURATION_INCREMENT); + println!( + "rtpdtmfsrc::nominal start buffer received {}", + details(buffer_idx) + ); + + buffer_idx += 1; + *step = if *step == START_2_SENT { + START_2_BUFFER_RECEIVED + } else { + START_2_MSG_AND_BUFFER_RECEIVED + }; + } + START_2_SENT..END_2_SENT => { + assert_eq!(buf[PAYLOAD_DIGIT_BYTE_INDEX], VALID_EVENT_2_DIGIT); + assert_eq!(buf[PAYLOAD_END_VOLUME_BYTE_INDEX], VALID_EVENT_2_VOLUME); + assert_eq!(buffer.duration().unwrap(), BUFFER_DURATION); + assert_eq!(duration, (buffer_idx + 1) * PAYLOAD_DURATION_INCREMENT); + println!( + "rtpdtmfsrc::nominal intermediate buffer received {}", + details(buffer_idx) + ); + + buffer_idx += 1; + } + END_2_SENT | END_2_MSG_RECEIVED => { + if buf[PAYLOAD_END_VOLUME_BYTE_INDEX] & PAYLOAD_END_MASK != 0 { + println!( + "rtpdtmfsrc::nominal end buffer received {}", + details(buffer_idx) + ); + assert_eq!(buffer.duration().unwrap(), BUFFER_DURATION_END); + assert_eq!(duration, (buffer_idx + 1) * PAYLOAD_DURATION_INCREMENT); + + buffer_idx += 1; + *step = if *step == END_2_SENT { + END_2_BUFFER_RECEIVED + } else { + END_2_MSG_AND_BUFFER_RECEIVED + }; + } else { + println!( + "rtpdtmfsrc::nominal late intermediate buffer received {}", + details(buffer_idx) + ); + assert_eq!(buffer.duration().unwrap(), BUFFER_DURATION); + assert_eq!(duration, (buffer_idx + 1) * PAYLOAD_DURATION_INCREMENT); + + buffer_idx += 1; + } + } + _ => panic!( + "rtpdtmfsrc::nominal unexpected received buffer {}", + details(buffer_idx) + ), + } + + Ok(gst::FlowSuccess::Ok) + } + }) + .build(), + ); + + pipe.set_state(gst::State::Playing).unwrap(); + + runtime::executor::block_on({ + fn new_dtmf_start_event(number: u8, volume: u8) -> gst::Event { + gst::event::CustomUpstream::builder( + gst::Structure::builder("dtmf-event") + .field("type", 1i32) + .field("method", 1i32) + .field("start", true) + .field("number", number as i32) + .field("volume", volume as i32) + .build(), + ) + .build() + } + + fn new_dtmf_end_event() -> gst::Event { + gst::event::CustomUpstream::builder( + gst::Structure::builder("dtmf-event") + .field("type", 1i32) + .field("method", 1i32) + .field("start", false) + .build(), + ) + .build() + } + + let pipe = pipe.clone(); + let src = src.clone(); + let appsink = appsink.upcast::(); + async move { + use gst::MessageView::*; + + let mut timer = runtime::timer::interval(Duration::from_millis(30)).unwrap(); + let mut bus_stream = pipe.bus().unwrap().stream(); + + loop { + futures::select! { + _ = timer.next() => { + let mut step = step.lock().unwrap(); + match *step { + INIT => { + *step = SEND_EARLY_END; + } + SEND_EARLY_END => { + println!("rtpdtmfsrc::nominal sending early end"); + if appsink.send_event(new_dtmf_end_event()) { + panic!("Shouldn't be able to send initial end dtmf-event"); + } + *step = SEND_START_4; + } + SEND_START_4 => { + println!("rtpdtmfsrc::nominal sending start 4"); + if !appsink.send_event(new_dtmf_start_event(VALID_EVENT_4_DIGIT, VALID_EVENT_4_VOLUME)) { + panic!("Failed to send start dtmf-event {step}"); + } + *step = START_4_SENT; + } + START_4_MSG_AND_BUFFER_RECEIVED..SEND_4_END => { + // give a little time interval + *step += 1; + } + SEND_4_END => { + println!("rtpdtmfsrc::nominal sending end 4"); + if !appsink.send_event(new_dtmf_end_event()) { + panic!("Failed to send start dtmf-event {step}"); + } + *step = END_4_SENT; + } + END_4_MSG_AND_BUFFER_RECEIVED..SEND_START_2 => { + // give a little time interval + *step += 1; + } + SEND_START_2 => { + println!("rtpdtmfsrc::nominal sending start 2"); + if !appsink.send_event(new_dtmf_start_event(VALID_EVENT_2_DIGIT, VALID_EVENT_2_VOLUME)) { + panic!("Failed to send start dtmf-event {step}"); + } + *step = START_2_SENT; + } + START_2_MSG_AND_BUFFER_RECEIVED..SEND_2_END => { + // give a little time interval + *step += 1; + } + SEND_2_END => { + println!("rtpdtmfsrc::nominal sending end 2"); + if !appsink.send_event(new_dtmf_end_event()) { + panic!("Failed to send start dtmf-event {step}"); + } + *step = END_2_SENT; + } + END_2_MSG_AND_BUFFER_RECEIVED => { + break; + } + _ => (), + } + } + // _ = eos_rx => { + // println!("rtpdtmfsrc::nominal"); + // } + msg = bus_stream.next() => { + let Some(msg) = msg else { continue }; + match msg.view() { + Element(_) => { + if msg.src().is_some_and(|obj| *obj == src) { + let mut step = step.lock().unwrap(); + match *step { + START_4_SENT | START_4_BUFFER_RECEIVED => { + let s = msg.structure().unwrap(); + assert_eq!(s.name(), "dtmf-event-processed"); + assert!(s.get::("start").unwrap()); + assert_eq!(s.get::("number").unwrap(), VALID_EVENT_4_DIGIT as i32); + assert_eq!(s.get::("volume").unwrap(), VALID_EVENT_4_VOLUME as i32); + println!("rtpdtmfsrc::nominal start 4 msg received"); + *step = if *step == START_4_SENT { + START_4_MSG_RECEIVED + } else { + START_4_MSG_AND_BUFFER_RECEIVED + }; + } + END_4_SENT | END_4_BUFFER_RECEIVED => { + let s = msg.structure().unwrap(); + assert_eq!(s.name(), "dtmf-event-processed"); + assert!(!s.get::("start").unwrap()); + println!("rtpdtmfsrc::nominal end 4 msg received"); + *step = if *step == END_4_SENT { + END_4_MSG_RECEIVED + } else { + END_4_MSG_AND_BUFFER_RECEIVED + }; + } + START_2_SENT | START_2_BUFFER_RECEIVED => { + let s = msg.structure().unwrap(); + assert_eq!(s.name(), "dtmf-event-processed"); + assert!(s.get::("start").unwrap()); + assert_eq!(s.get::("number").unwrap(), VALID_EVENT_2_DIGIT as i32); + assert_eq!(s.get::("volume").unwrap(), VALID_EVENT_2_VOLUME as i32); + println!("rtpdtmfsrc::nominal start 2 msg received"); + *step = if *step == START_2_SENT { + START_2_MSG_RECEIVED + } else { + START_2_MSG_AND_BUFFER_RECEIVED + }; + } + END_2_SENT | END_2_BUFFER_RECEIVED => { + let s = msg.structure().unwrap(); + assert_eq!(s.name(), "dtmf-event-processed"); + assert!(!s.get::("start").unwrap()); + println!("rtpdtmfsrc::nominal end 2 msg received"); + *step = if *step == END_2_SENT { + END_2_MSG_RECEIVED + } else { + END_2_MSG_AND_BUFFER_RECEIVED + }; + } + _ => panic!("Unexpected ts-rtpdtmfsrc msg {msg:?}"), + } + } + } + Latency(_) => { + let _ = pipe.recalculate_latency(); + } + Error(err) => unreachable!("rtpdtmfsrc::nominal {err}"), + _ => (), + } + } + }; + } + } + }); + + pipe.set_state(gst::State::Null).unwrap(); +}