diff --git a/subprojects/gst-plugins-good/docs/gst_plugins_cache.json b/subprojects/gst-plugins-good/docs/gst_plugins_cache.json index 9551f5b057..d558a53e71 100644 --- a/subprojects/gst-plugins-good/docs/gst_plugins_cache.json +++ b/subprojects/gst-plugins-good/docs/gst_plugins_cache.json @@ -15178,6 +15178,17 @@ } }, "properties": { + "payloads": { + "blurb": "All the RED payloads this decoder may encounter", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "mutable": "null", + "readable": true, + "type": "GstValueArray", + "writable": true + }, "pt": { "blurb": "Payload type FEC packets", "conditionally-available": false, diff --git a/subprojects/gst-plugins-good/gst/rtp/gstrtpreddec.c b/subprojects/gst-plugins-good/gst/rtp/gstrtpreddec.c index c9482cbefa..b4ff0d1999 100644 --- a/subprojects/gst-plugins-good/gst/rtp/gstrtpreddec.c +++ b/subprojects/gst-plugins-good/gst/rtp/gstrtpreddec.c @@ -90,7 +90,8 @@ enum { PROP_0, PROP_PT, - PROP_RECEIVED + PROP_RECEIVED, + PROP_PAYLOADS, }; static RTPHistItem * @@ -123,7 +124,8 @@ gst_rtp_red_history_find_less (gconstpointer item, gconstpointer timestamp) } static void -gst_rtp_red_history_update (GstRtpRedDec * self, GstRTPBuffer * rtp) +gst_rtp_red_history_update (GstRtpRedDec * self, GQueue * rtp_history, + GstRTPBuffer * rtp) { RTPHistItem *item; GList *link, *sibling; @@ -132,12 +134,12 @@ gst_rtp_red_history_update (GstRtpRedDec * self, GstRTPBuffer * rtp) * allocate a new link and a new item, * otherwise reuse the tail (the oldest data) without any reallocations */ - if (self->rtp_history->length < RTP_HISTORY_MAX_SIZE) { + if (rtp_history->length < RTP_HISTORY_MAX_SIZE) { item = rtp_hist_item_alloc (); link = g_list_alloc (); link->data = item; } else { - link = g_queue_pop_tail_link (self->rtp_history); + link = g_queue_pop_tail_link (rtp_history); item = link->data; } @@ -147,11 +149,11 @@ gst_rtp_red_history_update (GstRtpRedDec * self, GstRTPBuffer * rtp) /* Looking for a place to insert new link. * The queue has newest to oldest rtp timestamps, so in 99% cases * it is inserted before the head of the queue */ - sibling = g_list_find_custom (self->rtp_history->head, + sibling = g_list_find_custom (rtp_history->head, GUINT_TO_POINTER (item->timestamp), gst_rtp_red_history_find_less_or_equal); - g_queue_push_nth_link (self->rtp_history, - g_list_position (self->rtp_history->head, sibling), link); + g_queue_push_nth_link (rtp_history, + g_list_position (rtp_history->head, sibling), link); } static gboolean @@ -219,9 +221,9 @@ red_buffer_invalid: static gboolean gst_red_history_lost_seq_num_for_timestamp (GstRtpRedDec * self, - guint32 timestamp, guint16 * dst_seq_num) + GQueue * rtp_history, guint32 timestamp, guint16 * dst_seq_num) { - GList *older_sibling = g_list_find_custom (self->rtp_history->head, + GList *older_sibling = g_list_find_custom (rtp_history->head, GUINT_TO_POINTER (timestamp), gst_rtp_red_history_find_less); RTPHistItem *older; @@ -230,19 +232,19 @@ gst_red_history_lost_seq_num_for_timestamp (GstRtpRedDec * self, gint seq_diff, lost_packet_idx; if (NULL == older_sibling) { - if (self->rtp_history->length == RTP_HISTORY_MAX_SIZE) + if (rtp_history->length == RTP_HISTORY_MAX_SIZE) GST_WARNING_OBJECT (self, "History is too short. " "Oldest rtp timestamp %u, looking for %u, size %u", - RTP_HIST_ITEM_TIMESTAMP (self->rtp_history->tail->data), - timestamp, self->rtp_history->length); + RTP_HIST_ITEM_TIMESTAMP (rtp_history->tail->data), + timestamp, rtp_history->length); return FALSE; } if (NULL == older_sibling->prev) { GST_WARNING_OBJECT (self, "RED block timestamp offset probably wrong. " "Latest rtp timestamp %u, looking for %u, size %u", - RTP_HIST_ITEM_TIMESTAMP (self->rtp_history->head->data), - timestamp, self->rtp_history->length); + RTP_HIST_ITEM_TIMESTAMP (rtp_history->head->data), + timestamp, rtp_history->length); return FALSE; } @@ -316,7 +318,8 @@ gst_rtp_red_create_packet (GstRtpRedDec * self, GstRTPBuffer * red_rtp, static GstBuffer * gst_rtp_red_create_from_redundant_block (GstRtpRedDec * self, - GstRTPBuffer * red_rtp, gsize * red_hdr_offset, gsize * red_payload_offset) + GQueue * rtp_history, GstRTPBuffer * red_rtp, gsize * red_hdr_offset, + gsize * red_payload_offset) { guint8 *payload = gst_rtp_buffer_get_payload (red_rtp); guint8 *red_hdr = payload + *red_hdr_offset; @@ -325,11 +328,12 @@ gst_rtp_red_create_from_redundant_block (GstRtpRedDec * self, GstBuffer *ret = NULL; guint16 lost_seq = 0; - if (gst_red_history_lost_seq_num_for_timestamp (self, lost_timestamp, - &lost_seq)) { - GST_LOG_OBJECT (self, "Recovering from RED packet pt=%u ts=%u seq=%u" - " len=%u present", rtp_red_block_get_payload_type (red_hdr), - lost_timestamp, lost_seq, rtp_red_block_get_payload_length (red_hdr)); + if (gst_red_history_lost_seq_num_for_timestamp (self, rtp_history, + lost_timestamp, &lost_seq)) { + GST_LOG_OBJECT (self, + "Recovering from RED packet pt=%u ts=%u seq=%u" " len=%u present", + rtp_red_block_get_payload_type (red_hdr), lost_timestamp, lost_seq, + rtp_red_block_get_payload_length (red_hdr)); ret = gst_rtp_red_create_packet (self, red_rtp, FALSE, rtp_red_block_get_payload_type (red_hdr), lost_seq, lost_timestamp, @@ -367,13 +371,13 @@ gst_rtp_red_create_from_main_block (GstRtpRedDec * self, } static GstBuffer * -gst_rtp_red_create_from_block (GstRtpRedDec * self, GstRTPBuffer * red_rtp, - gsize * red_hdr_offset, gsize * red_payload_offset) +gst_rtp_red_create_from_block (GstRtpRedDec * self, GQueue * rtp_history, + GstRTPBuffer * red_rtp, gsize * red_hdr_offset, gsize * red_payload_offset) { guint8 *payload = gst_rtp_buffer_get_payload (red_rtp); if (rtp_red_block_is_redundant (payload + (*red_hdr_offset))) - return gst_rtp_red_create_from_redundant_block (self, red_rtp, + return gst_rtp_red_create_from_redundant_block (self, rtp_history, red_rtp, red_hdr_offset, red_payload_offset); return gst_rtp_red_create_from_main_block (self, red_rtp, *red_hdr_offset, @@ -381,8 +385,8 @@ gst_rtp_red_create_from_block (GstRtpRedDec * self, GstRTPBuffer * red_rtp, } static GstFlowReturn -gst_rtp_red_process (GstRtpRedDec * self, GstRTPBuffer * red_rtp, - gsize first_red_payload_offset) +gst_rtp_red_process (GstRtpRedDec * self, GQueue * rtp_history, + GstRTPBuffer * red_rtp, gsize first_red_payload_offset) { gsize red_hdr_offset = 0; gsize red_payload_offset = first_red_payload_offset; @@ -390,8 +394,8 @@ gst_rtp_red_process (GstRtpRedDec * self, GstRTPBuffer * red_rtp, GstFlowReturn ret = GST_FLOW_OK; do { - GstBuffer *buf = - gst_rtp_red_create_from_block (self, red_rtp, &red_hdr_offset, + GstBuffer *buf = gst_rtp_red_create_from_block (self, rtp_history, red_rtp, + &red_hdr_offset, &red_payload_offset); if (buf) ret = gst_pad_push (self->srcpad, buf); @@ -400,6 +404,25 @@ gst_rtp_red_process (GstRtpRedDec * self, GstRTPBuffer * red_rtp, return ret; } +static gboolean +is_red_pt (GstRtpRedDec * self, guint8 pt) +{ + gboolean ret; + + g_mutex_lock (&self->lock); + if (pt == self->pt) { + ret = TRUE; + goto done; + } + + ret = self->payloads + && g_hash_table_contains (self->payloads, GINT_TO_POINTER (pt)); + +done: + g_mutex_unlock (&self->lock); + return ret; +} + static GstFlowReturn gst_rtp_red_dec_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer) { @@ -407,16 +430,27 @@ gst_rtp_red_dec_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer) GstRTPBuffer irtp = GST_RTP_BUFFER_INIT; GstFlowReturn ret = GST_FLOW_OK; gsize first_red_payload_offset = 0; + GQueue *rtp_history; + guint32 ssrc; - if (self->pt == UNDEF_PT) + if (self->pt == UNDEF_PT && self->payloads == NULL) return gst_pad_push (self->srcpad, buffer); if (!gst_rtp_buffer_map (buffer, GST_MAP_READ, &irtp)) return gst_pad_push (self->srcpad, buffer); - gst_rtp_red_history_update (self, &irtp); + ssrc = gst_rtp_buffer_get_ssrc (&irtp); - if (self->pt != gst_rtp_buffer_get_payload_type (&irtp)) { + if (!(rtp_history = + g_hash_table_lookup (self->rtp_histories, GUINT_TO_POINTER (ssrc)))) { + rtp_history = g_queue_new (); + g_hash_table_insert (self->rtp_histories, GUINT_TO_POINTER (ssrc), + rtp_history); + } + + gst_rtp_red_history_update (self, rtp_history, &irtp); + + if (!is_red_pt (self, gst_rtp_buffer_get_payload_type (&irtp))) { GST_LOG_RTP_PACKET (self, "rtp header (incoming)", &irtp); gst_rtp_buffer_unmap (&irtp); @@ -427,7 +461,9 @@ gst_rtp_red_dec_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer) if (rtp_red_buffer_is_valid (self, &irtp, &first_red_payload_offset)) { GST_DEBUG_RTP_PACKET (self, "rtp header (red)", &irtp); - ret = gst_rtp_red_process (self, &irtp, first_red_payload_offset); + ret = + gst_rtp_red_process (self, rtp_history, &irtp, + first_red_payload_offset); } gst_rtp_buffer_unmap (&irtp); @@ -440,11 +476,23 @@ gst_rtp_red_dec_dispose (GObject * obj) { GstRtpRedDec *self = GST_RTP_RED_DEC (obj); - g_queue_free_full (self->rtp_history, rtp_hist_item_free); + g_hash_table_unref (self->rtp_histories); + + if (self->payloads) { + g_hash_table_unref (self->payloads); + } + + g_mutex_clear (&self->lock); G_OBJECT_CLASS (gst_rtp_red_dec_parent_class)->dispose (obj); } +static void +free_rtp_history (GQueue * rtp_history) +{ + g_queue_free_full (rtp_history, rtp_hist_item_free); +} + static void gst_rtp_red_dec_init (GstRtpRedDec * self) { @@ -466,10 +514,13 @@ gst_rtp_red_dec_init (GstRtpRedDec * self) self->pt = DEFAULT_PT; self->num_received = 0; - self->rtp_history = g_queue_new (); + self->rtp_histories = + g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL, + (GDestroyNotify) free_rtp_history); + self->payloads = NULL; + g_mutex_init (&self->lock); } - static void gst_rtp_red_dec_set_property (GObject * object, guint prop_id, const GValue * value, GParamSpec * pspec) @@ -478,14 +529,51 @@ gst_rtp_red_dec_set_property (GObject * object, guint prop_id, switch (prop_id) { case PROP_PT: + g_mutex_lock (&self->lock); self->pt = g_value_get_int (value); + g_mutex_unlock (&self->lock); break; + case PROP_PAYLOADS: + { + guint i, n_vals; + + g_mutex_lock (&self->lock); + if (self->payloads) { + g_hash_table_unref (self->payloads); + self->payloads = NULL; + } + + n_vals = gst_value_array_get_size (value); + + if (n_vals > 0) { + self->payloads = g_hash_table_new (g_direct_hash, g_direct_equal); + + for (i = 0; i < gst_value_array_get_size (value); i++) { + const GValue *val = gst_value_array_get_value (value, i); + + g_hash_table_insert (self->payloads, + GINT_TO_POINTER (g_value_get_int (val)), NULL); + } + } + g_mutex_unlock (&self->lock); + break; + } default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; } } +static void +append_payload (gpointer key, gpointer value, GValue * array) +{ + GValue v = { 0, }; + g_value_init (&v, G_TYPE_INT); + g_value_set_int (&v, GPOINTER_TO_INT (key)); + gst_value_array_append_value (array, &v); + g_value_unset (&v); +} + static void gst_rtp_red_dec_get_property (GObject * object, guint prop_id, GValue * value, GParamSpec * pspec) @@ -493,11 +581,22 @@ gst_rtp_red_dec_get_property (GObject * object, guint prop_id, GstRtpRedDec *self = GST_RTP_RED_DEC (object); switch (prop_id) { case PROP_PT: + g_mutex_lock (&self->lock); g_value_set_int (value, self->pt); + g_mutex_unlock (&self->lock); break; case PROP_RECEIVED: g_value_set_uint (value, self->num_received); break; + case PROP_PAYLOADS: + { + g_mutex_lock (&self->lock); + if (self->payloads) { + g_hash_table_foreach (self->payloads, (GHFunc) append_payload, value); + } + g_mutex_unlock (&self->lock); + break; + } default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -541,6 +640,27 @@ gst_rtp_red_dec_class_init (GstRtpRedDecClass * klass) "Count of received packets", 0, G_MAXUINT32, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); + /** + * rtpreddec:payloads: + * + * All the RED payloads this decoder may encounter + * + * Since: 1.20 + */ + g_object_class_install_property (G_OBJECT_CLASS (klass), + PROP_PAYLOADS, + gst_param_spec_array ("payloads", + "RED payloads", + "All the RED payloads this decoder may encounter", + g_param_spec_int ("pt", + "payload type", + "A RED payload type", + MIN_PT, MAX_PT, + DEFAULT_PT, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS), + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS) + ); + GST_DEBUG_CATEGORY_INIT (gst_rtp_red_dec_debug, "rtpreddec", 0, "RTP RED Decoder"); } diff --git a/subprojects/gst-plugins-good/gst/rtp/gstrtpreddec.h b/subprojects/gst-plugins-good/gst/rtp/gstrtpreddec.h index 1ab864e2c9..a1d89bf053 100644 --- a/subprojects/gst-plugins-good/gst/rtp/gstrtpreddec.h +++ b/subprojects/gst-plugins-good/gst/rtp/gstrtpreddec.h @@ -51,7 +51,14 @@ struct _GstRtpRedDec { gint pt; guint num_received; - GQueue *rtp_history; + /* Per ssrc */ + GHashTable *rtp_histories; + + /* To track all FEC payload types */ + GHashTable *payloads; + + /* Protects pt and payloads */ + GMutex lock; }; GType gst_rtp_red_dec_get_type (void);