diff --git a/gst/dtmf/gstrtpdtmfsrc.c b/gst/dtmf/gstrtpdtmfsrc.c index 11bff78686..26c8d0aa65 100644 --- a/gst/dtmf/gstrtpdtmfsrc.c +++ b/gst/dtmf/gstrtpdtmfsrc.c @@ -45,7 +45,7 @@ * * * - * + * * * * Name @@ -54,7 +54,7 @@ * Purpose * * - * + * * * * type @@ -98,12 +98,12 @@ * * * - * + * * For example, the following code informs the pipeline (and in turn, the * RTPDTMFSrc element inside the pipeline) about the start of an RTP DTMF named * event '1' of volume -25 dBm0: * - * + * * * * structure = gst_structure_new ("dtmf-event", @@ -111,7 +111,7 @@ * "number", G_TYPE_INT, 1, * "volume", G_TYPE_INT, 25, * "start", G_TYPE_BOOLEAN, TRUE, NULL); - * + * * event = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM, structure); * gst_element_send_event (pipeline, event); * @@ -127,6 +127,8 @@ #include #include +#include + #include "gstrtpdtmfsrc.h" #define GST_RTP_DTMF_TYPE_EVENT 1 @@ -146,6 +148,10 @@ #define MAX_VOLUME 36 #define MIN_EVENT_DURATION 50 +#define MIN_INTER_DIGIT_INTERVAL 50 +#define MIN_PULSE_DURATION 70 +#define MIN_DUTY_CYCLE (MIN_INTER_DIGIT_INTERVAL + MIN_PULSE_DURATION) + #define DEFAULT_PACKET_REDUNDANCY 1 #define MIN_PACKET_REDUNDANCY 1 #define MAX_PACKET_REDUNDANCY 5 @@ -188,8 +194,8 @@ GST_STATIC_PAD_TEMPLATE ("src", GST_STATIC_CAPS ("application/x-rtp, " "media = (string) \"audio\", " "payload = (int) [ 96, 127 ], " - "clock-rate = (int) [ 0, MAX ], " - "ssrc = (int) [ 0, MAX ], " + "clock-rate = (int) [ 0, MAX ], " + "ssrc = (int) [ 0, MAX ], " "events = (int) [ " MIN_EVENT_STRING ", " MAX_EVENT_STRING " ], " "encoding-name = (string) \"telephone-event\"") ); @@ -233,22 +239,25 @@ static gboolean gst_rtp_dtmf_src_handle_event (GstPad * pad, GstEvent * event); static GstStateChangeReturn gst_rtp_dtmf_src_change_state (GstElement * element, GstStateChange transition); static void gst_rtp_dtmf_src_push_next_rtp_packet (GstRTPDTMFSrc *dtmfsrc); -static void gst_rtp_dtmf_src_start (GstRTPDTMFSrc *dtmfsrc, gint event_number, - gint event_volume); +static void gst_rtp_dtmf_src_start (GstRTPDTMFSrc *dtmfsrc); static void gst_rtp_dtmf_src_stop (GstRTPDTMFSrc *dtmfsrc); +static void gst_rtp_dtmf_src_add_start_event (GstRTPDTMFSrc *dtmfsrc, + gint event_number, gint event_volume); +static void gst_rtp_dtmf_src_add_stop_event (GstRTPDTMFSrc *dtmfsrc); static void gst_rtp_dtmf_src_set_caps (GstRTPDTMFSrc *dtmfsrc); + static void gst_rtp_dtmf_src_base_init (gpointer g_class) { GstElementClass *element_class = GST_ELEMENT_CLASS (g_class); - + GST_DEBUG_CATEGORY_INIT (gst_rtp_dtmf_src_debug, "rtpdtmfsrc", 0, "rtpdtmfsrc element"); - + gst_element_class_add_pad_template (element_class, gst_static_pad_template_get (&gst_rtp_dtmf_src_template)); - + gst_element_class_set_details (element_class, &gst_rtp_dtmf_src_details); } @@ -264,7 +273,7 @@ gst_rtp_dtmf_src_class_init (GstRTPDTMFSrcClass * klass) parent_class = g_type_class_peek_parent (klass); gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_rtp_dtmf_src_finalize); - gobject_class->set_property = + gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_rtp_dtmf_src_set_property); gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_rtp_dtmf_src_get_property); @@ -321,16 +330,19 @@ gst_rtp_dtmf_src_init (GstRTPDTMFSrc * dtmfsrc, gpointer g_class) gst_element_add_pad (GST_ELEMENT (dtmfsrc), dtmfsrc->srcpad); gst_pad_set_event_function (dtmfsrc->srcpad, gst_rtp_dtmf_src_handle_event); - + dtmfsrc->ssrc = DEFAULT_SSRC; dtmfsrc->seqnum_offset = DEFAULT_SEQNUM_OFFSET; dtmfsrc->ts_offset = DEFAULT_TIMESTAMP_OFFSET; dtmfsrc->pt = DEFAULT_PT; dtmfsrc->clock_rate = DEFAULT_CLOCK_RATE; - dtmfsrc->payload = NULL; dtmfsrc->interval = DEFAULT_PACKET_INTERVAL; dtmfsrc->packet_redundancy = DEFAULT_PACKET_REDUNDANCY; - + + + dtmfsrc->event_queue = g_async_queue_new (); + dtmfsrc->last_event = NULL; + GST_DEBUG_OBJECT (dtmfsrc, "init done"); } @@ -339,6 +351,12 @@ gst_rtp_dtmf_src_finalize (GObject * object) { GstRTPDTMFSrc *dtmfsrc; + + if (dtmfsrc->event_queue) { + g_async_queue_unref (dtmfsrc->event_queue); + dtmfsrc->event_queue = NULL; + } + dtmfsrc = GST_RTP_DTMF_SRC (object); G_OBJECT_CLASS (parent_class)->finalize (object); @@ -373,12 +391,12 @@ gst_rtp_dtmf_src_handle_dtmf_event (GstRTPDTMFSrc *dtmfsrc, GST_DEBUG_OBJECT (dtmfsrc, "Received start event %d with volume %d", event_number, event_volume); - gst_rtp_dtmf_src_start (dtmfsrc, event_number, event_volume); + gst_rtp_dtmf_src_add_start_event (dtmfsrc, event_number, event_volume); } else { GST_DEBUG_OBJECT (dtmfsrc, "Received stop event"); - gst_rtp_dtmf_src_stop (dtmfsrc); + gst_rtp_dtmf_src_add_stop_event (dtmfsrc); } return TRUE; @@ -391,6 +409,7 @@ gst_rtp_dtmf_src_handle_custom_upstream (GstRTPDTMFSrc *dtmfsrc, GstEvent * event) { gboolean result = FALSE; + gchar *struct_str; const GstStructure *structure; if (GST_STATE (dtmfsrc) != GST_STATE_PLAYING) { @@ -400,6 +419,9 @@ gst_rtp_dtmf_src_handle_custom_upstream (GstRTPDTMFSrc *dtmfsrc, GST_DEBUG_OBJECT (dtmfsrc, "Received event is of our interest"); structure = gst_event_get_structure (event); + struct_str = gst_structure_to_string (structure); + GST_DEBUG_OBJECT (dtmfsrc, "Event has structure %s", struct_str); + g_free (struct_str); if (structure && gst_structure_has_name (structure, "dtmf-event")) result = gst_rtp_dtmf_src_handle_dtmf_event (dtmfsrc, structure); @@ -553,7 +575,8 @@ gst_rtp_dtmf_prepare_timestamps (GstRTPDTMFSrc *dtmfsrc) clock = GST_ELEMENT_CLOCK (dtmfsrc); if (clock != NULL) - dtmfsrc->timestamp = gst_clock_get_time (GST_ELEMENT_CLOCK (dtmfsrc)); + dtmfsrc->timestamp = gst_clock_get_time (GST_ELEMENT_CLOCK (dtmfsrc)) + + (MIN_INTER_DIGIT_INTERVAL * GST_MSECOND); else { GST_ERROR_OBJECT (dtmfsrc, "No clock set for element %s", @@ -569,23 +592,10 @@ gst_rtp_dtmf_prepare_timestamps (GstRTPDTMFSrc *dtmfsrc) } static void -gst_rtp_dtmf_src_start (GstRTPDTMFSrc *dtmfsrc, - gint event_number, gint event_volume) +gst_rtp_dtmf_src_start (GstRTPDTMFSrc *dtmfsrc) { - g_return_if_fail (dtmfsrc->payload == NULL); - - dtmfsrc->payload = g_new0 (GstRTPDTMFPayload, 1); - dtmfsrc->payload->event = CLAMP (event_number, MIN_EVENT, MAX_EVENT); - dtmfsrc->payload->volume = CLAMP (event_volume, MIN_VOLUME, MAX_VOLUME); - dtmfsrc->first_packet = TRUE; - dtmfsrc->last_packet = FALSE; - - gst_rtp_dtmf_prepare_timestamps (dtmfsrc); gst_rtp_dtmf_src_set_caps (dtmfsrc); - /* Don't forget to get exclusive access to the stream */ - gst_rtp_dtmf_src_set_stream_lock (dtmfsrc, TRUE); - if (!gst_pad_start_task (dtmfsrc->srcpad, (GstTaskFunction) gst_rtp_dtmf_src_push_next_rtp_packet, dtmfsrc)) { GST_ERROR_OBJECT (dtmfsrc, "Failed to start task on src pad"); @@ -595,19 +605,65 @@ gst_rtp_dtmf_src_start (GstRTPDTMFSrc *dtmfsrc, static void gst_rtp_dtmf_src_stop (GstRTPDTMFSrc *dtmfsrc) { - g_return_if_fail (dtmfsrc->payload != NULL); + /* Don't forget to release the stream lock */ + gst_rtp_dtmf_src_set_stream_lock (dtmfsrc, FALSE); - /* Push the last packet with e-bit set */ - /* Next packet sent will be the last */ - dtmfsrc->last_packet = TRUE; + /* Flushing the event queue */ + GstRTPDTMFSrcEvent *event = g_async_queue_try_pop (dtmfsrc->event_queue); + + while (event != NULL) { + g_free (event); + event = g_async_queue_try_pop (dtmfsrc->event_queue); + } + + if (dtmfsrc->last_event) { + g_free (dtmfsrc->last_event); + dtmfsrc->last_event = NULL; + } + + if (!gst_pad_pause_task (dtmfsrc->srcpad)) { + GST_ERROR_OBJECT (dtmfsrc, "Failed to pause task on src pad"); + return; + } } + + +static void +gst_rtp_dtmf_src_add_start_event (GstRTPDTMFSrc *dtmfsrc, gint event_number, + gint event_volume) +{ + + GstRTPDTMFSrcEvent * event = g_malloc (sizeof(GstRTPDTMFSrcEvent)); + event->event_type = RTP_DTMF_EVENT_TYPE_START; + + event->payload = g_new0 (GstRTPDTMFPayload, 1); + event->payload->event = CLAMP (event_number, MIN_EVENT, MAX_EVENT); + event->payload->volume = CLAMP (event_volume, MIN_VOLUME, MAX_VOLUME); + + g_async_queue_push (dtmfsrc->event_queue, event); +} + +static void +gst_rtp_dtmf_src_add_stop_event (GstRTPDTMFSrc *dtmfsrc) +{ + + GstRTPDTMFSrcEvent * event = g_malloc (sizeof(GstRTPDTMFSrcEvent)); + event->event_type = RTP_DTMF_EVENT_TYPE_STOP; + event->payload = g_new0 (GstRTPDTMFPayload, 1); + event->payload->event = 0; + event->payload->volume = 0; + + g_async_queue_push (dtmfsrc->event_queue, event); +} + + static void gst_rtp_dtmf_src_wait_for_buffer_ts (GstRTPDTMFSrc *dtmfsrc, GstBuffer * buf) { GstClock *clock; - + clock = GST_ELEMENT_CLOCK (dtmfsrc); if (clock != NULL) { GstClockID clock_id; @@ -629,7 +685,7 @@ gst_rtp_dtmf_src_wait_for_buffer_ts (GstRTPDTMFSrc *dtmfsrc, GstBuffer * buf) } static void -gst_rtp_dtmf_prepare_rtp_headers (GstRTPDTMFSrc *dtmfsrc, GstBuffer *buf) +gst_rtp_dtmf_prepare_rtp_headers (GstRTPDTMFSrc *dtmfsrc, GstRTPDTMFSrcEvent *event, GstBuffer *buf) { gst_rtp_buffer_set_ssrc (buf, dtmfsrc->current_ssrc); gst_rtp_buffer_set_payload_type (buf, dtmfsrc->pt); @@ -637,37 +693,37 @@ gst_rtp_dtmf_prepare_rtp_headers (GstRTPDTMFSrc *dtmfsrc, GstBuffer *buf) gst_rtp_buffer_set_marker (buf, TRUE); dtmfsrc->first_packet = FALSE; } else if (dtmfsrc->last_packet) { - dtmfsrc->payload->e = 1; + event->payload->e = 1; dtmfsrc->last_packet = FALSE; } dtmfsrc->seqnum++; gst_rtp_buffer_set_seq (buf, dtmfsrc->seqnum); - + /* timestamp of RTP header */ gst_rtp_buffer_set_timestamp (buf, dtmfsrc->rtp_timestamp); } static void -gst_rtp_dtmf_prepare_buffer_data (GstRTPDTMFSrc *dtmfsrc, GstBuffer *buf) +gst_rtp_dtmf_prepare_buffer_data (GstRTPDTMFSrc *dtmfsrc, GstRTPDTMFSrcEvent *event,GstBuffer *buf) { GstRTPDTMFPayload *payload; - - gst_rtp_dtmf_prepare_rtp_headers (dtmfsrc, buf); + + gst_rtp_dtmf_prepare_rtp_headers (dtmfsrc,event, buf); /* duration of DTMF payload */ - dtmfsrc->payload->duration += + event->payload->duration += dtmfsrc->interval * dtmfsrc->clock_rate / 1000; - /* timestamp and duration of GstBuffer */ + /* timestamp and duration of GstBuffer */ GST_BUFFER_DURATION (buf) = dtmfsrc->interval * GST_MSECOND; GST_BUFFER_TIMESTAMP (buf) = dtmfsrc->timestamp; dtmfsrc->timestamp += GST_BUFFER_DURATION (buf); - + payload = (GstRTPDTMFPayload *) gst_rtp_buffer_get_payload (buf); - + /* copy payload and convert to network-byte order */ - g_memmove (payload, dtmfsrc->payload, sizeof (GstRTPDTMFPayload)); + g_memmove (payload, event->payload, sizeof (GstRTPDTMFPayload)); /* Force the packet duration to a certain minumum * if its the end of the event */ @@ -679,18 +735,20 @@ gst_rtp_dtmf_prepare_buffer_data (GstRTPDTMFSrc *dtmfsrc, GstBuffer *buf) } static GstBuffer * -gst_rtp_dtmf_src_create_next_rtp_packet (GstRTPDTMFSrc *dtmfsrc) +gst_rtp_dtmf_src_create_next_rtp_packet (GstRTPDTMFSrc *dtmfsrc, GstRTPDTMFSrcEvent *event) { GstBuffer *buf = NULL; - + /* create buffer to hold the payload */ buf = gst_rtp_buffer_new_allocate (sizeof (GstRTPDTMFPayload), 0, 0); - gst_rtp_dtmf_prepare_buffer_data (dtmfsrc, buf); + gst_rtp_dtmf_prepare_buffer_data (dtmfsrc, event, buf); /* FIXME: Should we sync to clock ourselves or leave it to sink */ gst_rtp_dtmf_src_wait_for_buffer_ts (dtmfsrc, buf); + event->sent_packets++; + /* Set caps on the buffer before pushing it */ gst_buffer_set_caps (buf, GST_PAD_CAPS (dtmfsrc->srcpad)); @@ -703,58 +761,93 @@ gst_rtp_dtmf_src_push_next_rtp_packet (GstRTPDTMFSrc *dtmfsrc) GstBuffer *buf = NULL; GstFlowReturn ret; gint redundancy_count = 1; + GstRTPDTMFSrcEvent *event; - if (dtmfsrc->first_packet == TRUE || dtmfsrc->last_packet == TRUE) { - redundancy_count = dtmfsrc->packet_redundancy; + g_async_queue_ref (dtmfsrc->event_queue); + + if (dtmfsrc->last_event == NULL) { + event = g_async_queue_pop (dtmfsrc->event_queue); + + if (event->event_type == RTP_DTMF_EVENT_TYPE_STOP) { + GST_WARNING_OBJECT (dtmfsrc, "Received a DTMF stop event when already stopped"); + } else if (event->event_type == RTP_DTMF_EVENT_TYPE_START) { + + dtmfsrc->first_packet = TRUE; + dtmfsrc->last_packet = FALSE; + gst_rtp_dtmf_prepare_timestamps (dtmfsrc); + + /* Don't forget to get exclusive access to the stream */ + gst_rtp_dtmf_src_set_stream_lock (dtmfsrc, TRUE); + + event->sent_packets = 0; + + dtmfsrc->last_event = event; + } + } else if (dtmfsrc->last_event->sent_packets * dtmfsrc->interval >= MIN_PULSE_DURATION){ + event = g_async_queue_try_pop (dtmfsrc->event_queue); + + if (event != NULL) { + if (event->event_type == RTP_DTMF_EVENT_TYPE_START) { + GST_WARNING_OBJECT (dtmfsrc, "Received two consecutive DTMF start events"); + } else if (event->event_type == RTP_DTMF_EVENT_TYPE_STOP) { + dtmfsrc->first_packet = FALSE; + dtmfsrc->last_packet = TRUE; + } + } + } + g_async_queue_unref (dtmfsrc->event_queue); + + if (dtmfsrc->last_event) { + + if (dtmfsrc->first_packet == TRUE || dtmfsrc->last_packet == TRUE) { + redundancy_count = dtmfsrc->packet_redundancy; + + if(dtmfsrc->first_packet == TRUE) { + GST_DEBUG_OBJECT (dtmfsrc, + "redundancy count set to %d due to dtmf start", + redundancy_count); + } else if(dtmfsrc->last_packet == TRUE) { + GST_DEBUG_OBJECT (dtmfsrc, + "redundancy count set to %d due to dtmf stop", + redundancy_count); + } - if(dtmfsrc->first_packet == TRUE) { - GST_DEBUG_OBJECT (dtmfsrc, - "redundancy count set to %d due to dtmf start", - redundancy_count); - } else if(dtmfsrc->last_packet == TRUE) { - GST_DEBUG_OBJECT (dtmfsrc, - "redundancy count set to %d due to dtmf stop", - redundancy_count); } - } + /* create buffer to hold the payload */ + buf = gst_rtp_dtmf_src_create_next_rtp_packet (dtmfsrc, dtmfsrc->last_event); - /* create buffer to hold the payload */ - buf = gst_rtp_dtmf_src_create_next_rtp_packet (dtmfsrc); + while ( redundancy_count-- ) { + gst_buffer_ref(buf); - while ( redundancy_count-- ) { - gst_buffer_ref(buf); + GST_DEBUG_OBJECT (dtmfsrc, + "pushing buffer on src pad of size %d with redundancy count %d", + GST_BUFFER_SIZE (buf), redundancy_count); + ret = gst_pad_push (dtmfsrc->srcpad, buf); + if (ret != GST_FLOW_OK) + GST_ERROR_OBJECT (dtmfsrc, + "Failed to push buffer on src pad"); + /* Make sure only the first packet sent has the marker set */ + gst_rtp_buffer_set_marker (buf, FALSE); + } + + gst_buffer_unref(buf); GST_DEBUG_OBJECT (dtmfsrc, - "pushing buffer on src pad of size %d with redundancy count %d", - GST_BUFFER_SIZE (buf), redundancy_count); - ret = gst_pad_push (dtmfsrc->srcpad, buf); - if (ret != GST_FLOW_OK) - GST_ERROR_OBJECT (dtmfsrc, - "Failed to push buffer on src pad", GST_BUFFER_SIZE (buf)); + "pushed DTMF event '%d' on src pad", event->payload->event); - /* Make sure only the first packet sent has the marker set */ - gst_rtp_buffer_set_marker (buf, FALSE); - } + if (dtmfsrc->last_event->payload->e) { + /* Don't forget to release the stream lock */ + gst_rtp_dtmf_src_set_stream_lock (dtmfsrc, FALSE); - gst_buffer_unref(buf); - GST_DEBUG_OBJECT (dtmfsrc, - "pushed DTMF event '%d' on src pad", dtmfsrc->payload->event); + g_free (dtmfsrc->last_event->payload); + event->payload = NULL; - if (dtmfsrc->payload->e) { - /* Don't forget to release the stream lock */ - gst_rtp_dtmf_src_set_stream_lock (dtmfsrc, FALSE); + g_free (dtmfsrc->last_event); + dtmfsrc->last_event = NULL; - g_free (dtmfsrc->payload); - dtmfsrc->payload = NULL; - - if (!gst_pad_pause_task (dtmfsrc->srcpad)) { - GST_ERROR_OBJECT (dtmfsrc, "Failed to pause task on src pad"); - return; } - } - } static void @@ -785,7 +878,7 @@ static void gst_rtp_dtmf_src_ready_to_paused (GstRTPDTMFSrc *dtmfsrc) { gst_segment_init (&dtmfsrc->segment, GST_FORMAT_UNDEFINED); - + if (dtmfsrc->ssrc == -1) dtmfsrc->current_ssrc = g_random_int (); else @@ -796,7 +889,7 @@ gst_rtp_dtmf_src_ready_to_paused (GstRTPDTMFSrc *dtmfsrc) else dtmfsrc->seqnum_base = dtmfsrc->seqnum_offset; dtmfsrc->seqnum = dtmfsrc->seqnum_base; - + if (dtmfsrc->ts_offset == -1) dtmfsrc->ts_base = g_random_int (); else @@ -818,6 +911,9 @@ gst_rtp_dtmf_src_change_state (GstElement * element, GstStateChange transition) /* Indicate that we don't do PRE_ROLL */ no_preroll = TRUE; break; + case GST_STATE_CHANGE_PAUSED_TO_PLAYING: + gst_rtp_dtmf_src_start (dtmfsrc); + break; default: break; } @@ -831,6 +927,7 @@ gst_rtp_dtmf_src_change_state (GstElement * element, GstStateChange transition) case GST_STATE_CHANGE_PLAYING_TO_PAUSED: /* Indicate that we don't do PRE_ROLL */ no_preroll = TRUE; + gst_rtp_dtmf_src_stop (dtmfsrc); break; default: break; diff --git a/gst/dtmf/gstrtpdtmfsrc.h b/gst/dtmf/gstrtpdtmfsrc.h index 8a763cce0c..2bbfb9db88 100644 --- a/gst/dtmf/gstrtpdtmfsrc.h +++ b/gst/dtmf/gstrtpdtmfsrc.h @@ -57,6 +57,23 @@ typedef struct { typedef struct _GstRTPDTMFSrc GstRTPDTMFSrc; typedef struct _GstRTPDTMFSrcClass GstRTPDTMFSrcClass; + + +static enum _GstRTPDTMFEventType { + RTP_DTMF_EVENT_TYPE_START, + RTP_DTMF_EVENT_TYPE_STOP +}; + +typedef enum _GstRTPDTMFEventType GstRTPDTMFEventType; + +struct _GstRTPDTMFSrcEvent { + GstRTPDTMFEventType event_type; + GstRTPDTMFPayload* payload; + guint32 sent_packets; +}; + +typedef struct _GstRTPDTMFSrcEvent GstRTPDTMFSrcEvent; + /** * GstRTPDTMFSrc: * @element: the parent element. @@ -64,30 +81,28 @@ typedef struct _GstRTPDTMFSrcClass GstRTPDTMFSrcClass; * The opaque #GstRTPDTMFSrc data structure. */ struct _GstRTPDTMFSrc { - GstElement element; + GstElement element; - GstPad *srcpad; - GstRTPDTMFPayload *payload; + GstPad* srcpad; + GstSegment segment; + GAsyncQueue* event_queue; + GstRTPDTMFSrcEvent* last_event; + GstClockTime timestamp; + gboolean first_packet; + gboolean last_packet; guint32 ts_base; guint16 seqnum_base; - gint16 seqnum_offset; guint16 seqnum; gint32 ts_offset; guint32 rtp_timestamp; - guint32 clock_rate; guint pt; guint ssrc; guint current_ssrc; - gboolean first_packet; - gboolean last_packet; - - GstClockTime timestamp; - GstSegment segment; - guint16 interval; guint16 packet_redundancy; + guint32 clock_rate; }; struct _GstRTPDTMFSrcClass {