From c917f11ae822ac69213e4388be76fe31eca17e0c Mon Sep 17 00:00:00 2001 From: Nicolas Dufresne Date: Thu, 6 Jun 2019 14:44:27 -0400 Subject: [PATCH] rtpjitterbuffer: Move item structure outside of the element This moves the RtpJitterBufferStructure type, alloc, free into rtpjitterbuffer.c/h implementation. jitterbuffer.c strictly rely on the fact this structure is compatible with GList, and so it make more sense to keep encapsulate it. Also, anything that could possibly reduce the amount of code in the element is a win. In order to support that move, a function pointer to free the data was added. This also allow making the free function option when flushing the jitterbuffer. --- gst/rtpmanager/gstrtpjitterbuffer.c | 75 +++++++++++------------------ gst/rtpmanager/rtpjitterbuffer.c | 66 ++++++++++++++++++++++++- gst/rtpmanager/rtpjitterbuffer.h | 14 +++++- 3 files changed, 105 insertions(+), 50 deletions(-) diff --git a/gst/rtpmanager/gstrtpjitterbuffer.c b/gst/rtpmanager/gstrtpjitterbuffer.c index e32b1cb4c3..4c06762746 100644 --- a/gst/rtpmanager/gstrtpjitterbuffer.c +++ b/gst/rtpmanager/gstrtpjitterbuffer.c @@ -1042,48 +1042,26 @@ gst_rtp_jitter_buffer_init (GstRtpJitterBuffer * jitterbuffer) #define ITEM_TYPE_EVENT 2 #define ITEM_TYPE_QUERY 3 -static RTPJitterBufferItem * -alloc_item (gpointer data, guint type, GstClockTime dts, GstClockTime pts, - guint seqnum, guint count, guint rtptime) +static inline RTPJitterBufferItem * +alloc_event_item (GstEvent * event) { - RTPJitterBufferItem *item; - - item = g_slice_new (RTPJitterBufferItem); - item->data = data; - item->next = NULL; - item->prev = NULL; - item->type = type; - item->dts = dts; - item->pts = pts; - item->seqnum = seqnum; - item->count = count; - item->rtptime = rtptime; - - return item; + return rtp_jitter_buffer_alloc_item (event, ITEM_TYPE_EVENT, -1, -1, -1, 0, + -1, (GDestroyNotify) gst_mini_object_unref); } static void -free_item (RTPJitterBufferItem * item) -{ - g_return_if_fail (item != NULL); - - if (item->data && item->type != ITEM_TYPE_QUERY) - gst_mini_object_unref (item->data); - g_slice_free (RTPJitterBufferItem, item); -} - -static void -free_item_and_retain_events (RTPJitterBufferItem * item, gpointer user_data) +free_item_and_retain_sticky_events (RTPJitterBufferItem * item, + gpointer user_data) { GList **l = user_data; if (item->data && item->type == ITEM_TYPE_EVENT && GST_EVENT_IS_STICKY (item->data)) { *l = g_list_prepend (*l, item->data); - } else if (item->data && item->type != ITEM_TYPE_QUERY) { - gst_mini_object_unref (item->data); + item->data = NULL; } - g_slice_free (RTPJitterBufferItem, item); + + rtp_jitter_buffer_free_item (item); } static void @@ -1103,7 +1081,7 @@ gst_rtp_jitter_buffer_finalize (GObject * object) g_cond_clear (&priv->jbuf_event); g_cond_clear (&priv->jbuf_query); - rtp_jitter_buffer_flush (priv->jbuf, (GFunc) free_item, NULL); + rtp_jitter_buffer_flush (priv->jbuf, NULL, NULL); g_queue_foreach (&priv->gap_packets, (GFunc) gst_buffer_unref, NULL); g_queue_clear (&priv->gap_packets); g_object_unref (priv->jbuf); @@ -1584,7 +1562,7 @@ gst_rtp_jitter_buffer_flush_stop (GstRtpJitterBuffer * jitterbuffer) priv->equidistant = 0; priv->segment_seqnum = GST_SEQNUM_INVALID; GST_DEBUG_OBJECT (jitterbuffer, "flush and reset jitterbuffer"); - rtp_jitter_buffer_flush (priv->jbuf, (GFunc) free_item, NULL); + rtp_jitter_buffer_flush (priv->jbuf, NULL, NULL); rtp_jitter_buffer_disable_buffering (priv->jbuf, FALSE); rtp_jitter_buffer_reset_skew (priv->jbuf); remove_all_timers (jitterbuffer); @@ -1802,7 +1780,7 @@ queue_event (GstRtpJitterBuffer * jitterbuffer, GstEvent * event) GST_DEBUG_OBJECT (jitterbuffer, "adding event"); - item = alloc_item (event, ITEM_TYPE_EVENT, -1, -1, -1, 0, -1); + item = alloc_event_item (event); rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL); if (head || priv->eos) JBUF_SIGNAL_EVENT (priv); @@ -2790,7 +2768,7 @@ gst_rtp_jitter_buffer_reset (GstRtpJitterBuffer * jitterbuffer, GST_DEBUG_OBJECT (jitterbuffer, "flush and reset jitterbuffer"); rtp_jitter_buffer_flush (priv->jbuf, - (GFunc) free_item_and_retain_events, &events); + (GFunc) free_item_and_retain_sticky_events, &events); rtp_jitter_buffer_reset_skew (priv->jbuf); remove_all_timers (jitterbuffer); priv->discont = TRUE; @@ -2817,7 +2795,7 @@ gst_rtp_jitter_buffer_reset (GstRtpJitterBuffer * jitterbuffer, for (l = events; l; l = l->next) { RTPJitterBufferItem *item; - item = alloc_item (l->data, ITEM_TYPE_EVENT, -1, -1, -1, 0, -1); + item = alloc_event_item (l->data); rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL); } g_list_free (events); @@ -3207,7 +3185,7 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent, GST_DEBUG_OBJECT (jitterbuffer, "Queue full, dropping old packet %p", old_item); priv->next_seqnum = (old_item->seqnum + old_item->count) & 0xffff; - free_item (old_item); + rtp_jitter_buffer_free_item (old_item); } /* we might have removed some head buffers, signal the pushing thread to * see if it can push now */ @@ -3220,11 +3198,12 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent, * any of those is valid in the buffer, so we know that if we estimated the * dts that both are unknown */ if (estimated_dts) - item = - alloc_item (buffer, ITEM_TYPE_BUFFER, GST_CLOCK_TIME_NONE, - pts, seqnum, 1, rtptime); + item = rtp_jitter_buffer_alloc_item (buffer, ITEM_TYPE_BUFFER, + GST_CLOCK_TIME_NONE, pts, seqnum, 1, rtptime, + (GDestroyNotify) gst_mini_object_unref); else - item = alloc_item (buffer, ITEM_TYPE_BUFFER, dts, pts, seqnum, 1, rtptime); + item = rtp_jitter_buffer_alloc_item (buffer, ITEM_TYPE_BUFFER, dts, pts, + seqnum, 1, rtptime, (GDestroyNotify) gst_mini_object_unref); /* now insert the packet into the queue in sorted order. This function returns * FALSE if a packet with the same seqnum was already in the queue, meaning we @@ -3326,7 +3305,7 @@ duplicate: GST_DEBUG_OBJECT (jitterbuffer, "Duplicate packet #%d detected, dropping", seqnum); priv->num_duplicates++; - free_item (item); + rtp_jitter_buffer_free_item (item); goto finished; } rtx_duplicate: @@ -3565,7 +3544,7 @@ pop_and_push_next (GstRtpJitterBuffer * jitterbuffer, guint seqnum) JBUF_UNLOCK (priv); item->data = NULL; - free_item (item); + rtp_jitter_buffer_free_item (item); if (msg) gst_element_post_message (GST_ELEMENT_CAST (jitterbuffer), msg); @@ -3685,7 +3664,7 @@ handle_next_buffer (GstRtpJitterBuffer * jitterbuffer) GST_DEBUG_OBJECT (jitterbuffer, "Old packet #%d, next #%d dropping", seqnum, next_seqnum); item = rtp_jitter_buffer_pop (priv->jbuf, NULL); - free_item (item); + rtp_jitter_buffer_free_item (item); result = GST_FLOW_OK; } else { /* the chain function has scheduled timers to request retransmission or @@ -3968,10 +3947,11 @@ do_lost_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer, "duration", G_TYPE_UINT64, duration, "retry", G_TYPE_UINT, num_rtx_retry, NULL)); } - item = alloc_item (event, ITEM_TYPE_LOST, -1, -1, seqnum, lost_packets, -1); + item = rtp_jitter_buffer_alloc_item (event, ITEM_TYPE_LOST, -1, -1, seqnum, + lost_packets, -1, (GDestroyNotify) gst_mini_object_unref); if (!rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL)) /* Duplicate */ - free_item (item); + rtp_jitter_buffer_free_item (item); if (GST_CLOCK_TIME_IS_VALID (timer->rtx_last)) { /* Store info to update stats if the packet arrives too late */ @@ -4478,7 +4458,8 @@ gst_rtp_jitter_buffer_sink_query (GstPad * pad, GstObject * parent, if (rtp_jitter_buffer_get_mode (priv->jbuf) != RTP_JITTER_BUFFER_MODE_BUFFER) { GST_DEBUG_OBJECT (jitterbuffer, "adding serialized query"); - item = alloc_item (query, ITEM_TYPE_QUERY, -1, -1, -1, 0, -1); + item = rtp_jitter_buffer_alloc_item (query, ITEM_TYPE_QUERY, -1, -1, + -1, 0, -1, NULL); rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL); if (head) JBUF_SIGNAL_EVENT (priv); diff --git a/gst/rtpmanager/rtpjitterbuffer.c b/gst/rtpmanager/rtpjitterbuffer.c index defb2db3b6..85dfc44f25 100644 --- a/gst/rtpmanager/rtpjitterbuffer.c +++ b/gst/rtpmanager/rtpjitterbuffer.c @@ -1108,6 +1108,10 @@ rtp_jitter_buffer_pop (RTPJitterBuffer * jbuf, gint * percent) else if (percent) *percent = -1; + /* let's clear the pointers so we can ensure we don't free items that are + * still in the jitterbuffer */ + item->next = item->prev = NULL; + return (RTPJitterBufferItem *) item; } @@ -1133,7 +1137,7 @@ rtp_jitter_buffer_peek (RTPJitterBuffer * jbuf) /** * rtp_jitter_buffer_flush: * @jbuf: an #RTPJitterBuffer - * @free_func: function to free each item + * @free_func: function to free each item (optional) * @user_data: user data passed to @free_func * * Flush all packets from the jitterbuffer. @@ -1145,7 +1149,9 @@ rtp_jitter_buffer_flush (RTPJitterBuffer * jbuf, GFunc free_func, GList *item; g_return_if_fail (jbuf != NULL); - g_return_if_fail (free_func != NULL); + + if (free_func == NULL) + free_func = (GFunc) rtp_jitter_buffer_free_item; while ((item = g_queue_pop_head_link (jbuf->packets))) free_func ((RTPJitterBufferItem *) item, user_data); @@ -1376,3 +1382,59 @@ rtp_jitter_buffer_is_full (RTPJitterBuffer * jbuf) return rtp_jitter_buffer_get_seqnum_diff (jbuf) >= 32765 && rtp_jitter_buffer_num_packets (jbuf) > 10000; } + +/** + * rtp_jitter_buffer_alloc_item: + * @data: The data stored in this item + * @type: User specific item type + * @dts: Decoding Timestamp + * @pts: Presentation Timestamp + * @seqnum: Sequence number + * @count: Number of packet this item represent + * @rtptime: The RTP specific timestamp + * @free_data: A function to free @data (optional) + * + * Create an item that can then be stored in the jitter buffer. + * + * Returns: a newly allocated RTPJitterbufferItem + */ +RTPJitterBufferItem * +rtp_jitter_buffer_alloc_item (gpointer data, guint type, GstClockTime dts, + GstClockTime pts, guint seqnum, guint count, guint rtptime, + GDestroyNotify free_data) +{ + RTPJitterBufferItem *item; + + item = g_slice_new (RTPJitterBufferItem); + item->data = data; + item->next = NULL; + item->prev = NULL; + item->type = type; + item->dts = dts; + item->pts = pts; + item->seqnum = seqnum; + item->count = count; + item->rtptime = rtptime; + item->free_data = free_data; + + return item; +} + +/** + * rtp_jitter_buffer_free_item: + * @item: the item to be freed + * + * Free the jitter buffer item. + */ +void +rtp_jitter_buffer_free_item (RTPJitterBufferItem * item) +{ + g_return_if_fail (item != NULL); + /* needs to be unlinked first */ + g_return_if_fail (item->next == NULL); + g_return_if_fail (item->prev == NULL); + + if (item->data && item->free_data) + item->free_data (item->data); + g_slice_free (RTPJitterBufferItem, item); +} diff --git a/gst/rtpmanager/rtpjitterbuffer.h b/gst/rtpmanager/rtpjitterbuffer.h index 820e73d946..0957f90c33 100644 --- a/gst/rtpmanager/rtpjitterbuffer.h +++ b/gst/rtpmanager/rtpjitterbuffer.h @@ -129,19 +129,27 @@ struct _RTPJitterBufferClass { * append. * @count: amount of seqnum in this item * @rtptime: rtp timestamp + * @data_free: Function to free @data (optional) * - * An object containing an RTP packet or event. + * An object containing an RTP packet or event. First members of this structure + * copied from GList so they can be inserted into lists without doing more + * allocations. */ struct _RTPJitterBufferItem { + /* a GList */ gpointer data; GList *next; GList *prev; + + /* item metadata */ guint type; GstClockTime dts; GstClockTime pts; guint seqnum; guint count; guint rtptime; + + GDestroyNotify free_data; }; GType rtp_jitter_buffer_get_type (void); @@ -197,5 +205,9 @@ gboolean rtp_jitter_buffer_can_fast_start (RTPJitterBuffer * jbuf gboolean rtp_jitter_buffer_is_full (RTPJitterBuffer * jbuf); +RTPJitterBufferItem * rtp_jitter_buffer_alloc_item (gpointer data, guint type, GstClockTime dts, + GstClockTime pts, guint seqnum, guint count, + guint rtptime, GDestroyNotify free_data); +void rtp_jitter_buffer_free_item (RTPJitterBufferItem * item); #endif /* __RTP_JITTER_BUFFER_H__ */