rtpjitterbuffer: Move item structure outside of the element

This moves the RtpJitterBufferStructure type, alloc, free into
rtpjitterbuffer.c/h implementation. jitterbuffer.c strictly rely on
the fact this structure is compatible with GList, and so it make more
sense to keep encapsulate it. Also, anything that could possibly
reduce the amount of code in the element is a win.

In order to support that move, a function pointer to free the data
was added. This also allow making the free function option when
flushing the jitterbuffer.
This commit is contained in:
Nicolas Dufresne 2019-06-06 14:44:27 -04:00
parent 9b706b6220
commit c917f11ae8
3 changed files with 105 additions and 50 deletions

View file

@ -1042,48 +1042,26 @@ gst_rtp_jitter_buffer_init (GstRtpJitterBuffer * jitterbuffer)
#define ITEM_TYPE_EVENT 2 #define ITEM_TYPE_EVENT 2
#define ITEM_TYPE_QUERY 3 #define ITEM_TYPE_QUERY 3
static RTPJitterBufferItem * static inline RTPJitterBufferItem *
alloc_item (gpointer data, guint type, GstClockTime dts, GstClockTime pts, alloc_event_item (GstEvent * event)
guint seqnum, guint count, guint rtptime)
{ {
RTPJitterBufferItem *item; return rtp_jitter_buffer_alloc_item (event, ITEM_TYPE_EVENT, -1, -1, -1, 0,
-1, (GDestroyNotify) gst_mini_object_unref);
item = g_slice_new (RTPJitterBufferItem);
item->data = data;
item->next = NULL;
item->prev = NULL;
item->type = type;
item->dts = dts;
item->pts = pts;
item->seqnum = seqnum;
item->count = count;
item->rtptime = rtptime;
return item;
} }
static void static void
free_item (RTPJitterBufferItem * item) free_item_and_retain_sticky_events (RTPJitterBufferItem * item,
{ gpointer user_data)
g_return_if_fail (item != NULL);
if (item->data && item->type != ITEM_TYPE_QUERY)
gst_mini_object_unref (item->data);
g_slice_free (RTPJitterBufferItem, item);
}
static void
free_item_and_retain_events (RTPJitterBufferItem * item, gpointer user_data)
{ {
GList **l = user_data; GList **l = user_data;
if (item->data && item->type == ITEM_TYPE_EVENT if (item->data && item->type == ITEM_TYPE_EVENT
&& GST_EVENT_IS_STICKY (item->data)) { && GST_EVENT_IS_STICKY (item->data)) {
*l = g_list_prepend (*l, item->data); *l = g_list_prepend (*l, item->data);
} else if (item->data && item->type != ITEM_TYPE_QUERY) { item->data = NULL;
gst_mini_object_unref (item->data);
} }
g_slice_free (RTPJitterBufferItem, item);
rtp_jitter_buffer_free_item (item);
} }
static void static void
@ -1103,7 +1081,7 @@ gst_rtp_jitter_buffer_finalize (GObject * object)
g_cond_clear (&priv->jbuf_event); g_cond_clear (&priv->jbuf_event);
g_cond_clear (&priv->jbuf_query); g_cond_clear (&priv->jbuf_query);
rtp_jitter_buffer_flush (priv->jbuf, (GFunc) free_item, NULL); rtp_jitter_buffer_flush (priv->jbuf, NULL, NULL);
g_queue_foreach (&priv->gap_packets, (GFunc) gst_buffer_unref, NULL); g_queue_foreach (&priv->gap_packets, (GFunc) gst_buffer_unref, NULL);
g_queue_clear (&priv->gap_packets); g_queue_clear (&priv->gap_packets);
g_object_unref (priv->jbuf); g_object_unref (priv->jbuf);
@ -1584,7 +1562,7 @@ gst_rtp_jitter_buffer_flush_stop (GstRtpJitterBuffer * jitterbuffer)
priv->equidistant = 0; priv->equidistant = 0;
priv->segment_seqnum = GST_SEQNUM_INVALID; priv->segment_seqnum = GST_SEQNUM_INVALID;
GST_DEBUG_OBJECT (jitterbuffer, "flush and reset jitterbuffer"); GST_DEBUG_OBJECT (jitterbuffer, "flush and reset jitterbuffer");
rtp_jitter_buffer_flush (priv->jbuf, (GFunc) free_item, NULL); rtp_jitter_buffer_flush (priv->jbuf, NULL, NULL);
rtp_jitter_buffer_disable_buffering (priv->jbuf, FALSE); rtp_jitter_buffer_disable_buffering (priv->jbuf, FALSE);
rtp_jitter_buffer_reset_skew (priv->jbuf); rtp_jitter_buffer_reset_skew (priv->jbuf);
remove_all_timers (jitterbuffer); remove_all_timers (jitterbuffer);
@ -1802,7 +1780,7 @@ queue_event (GstRtpJitterBuffer * jitterbuffer, GstEvent * event)
GST_DEBUG_OBJECT (jitterbuffer, "adding event"); GST_DEBUG_OBJECT (jitterbuffer, "adding event");
item = alloc_item (event, ITEM_TYPE_EVENT, -1, -1, -1, 0, -1); item = alloc_event_item (event);
rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL); rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL);
if (head || priv->eos) if (head || priv->eos)
JBUF_SIGNAL_EVENT (priv); JBUF_SIGNAL_EVENT (priv);
@ -2790,7 +2768,7 @@ gst_rtp_jitter_buffer_reset (GstRtpJitterBuffer * jitterbuffer,
GST_DEBUG_OBJECT (jitterbuffer, "flush and reset jitterbuffer"); GST_DEBUG_OBJECT (jitterbuffer, "flush and reset jitterbuffer");
rtp_jitter_buffer_flush (priv->jbuf, rtp_jitter_buffer_flush (priv->jbuf,
(GFunc) free_item_and_retain_events, &events); (GFunc) free_item_and_retain_sticky_events, &events);
rtp_jitter_buffer_reset_skew (priv->jbuf); rtp_jitter_buffer_reset_skew (priv->jbuf);
remove_all_timers (jitterbuffer); remove_all_timers (jitterbuffer);
priv->discont = TRUE; priv->discont = TRUE;
@ -2817,7 +2795,7 @@ gst_rtp_jitter_buffer_reset (GstRtpJitterBuffer * jitterbuffer,
for (l = events; l; l = l->next) { for (l = events; l; l = l->next) {
RTPJitterBufferItem *item; RTPJitterBufferItem *item;
item = alloc_item (l->data, ITEM_TYPE_EVENT, -1, -1, -1, 0, -1); item = alloc_event_item (l->data);
rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL); rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL);
} }
g_list_free (events); g_list_free (events);
@ -3207,7 +3185,7 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent,
GST_DEBUG_OBJECT (jitterbuffer, "Queue full, dropping old packet %p", GST_DEBUG_OBJECT (jitterbuffer, "Queue full, dropping old packet %p",
old_item); old_item);
priv->next_seqnum = (old_item->seqnum + old_item->count) & 0xffff; priv->next_seqnum = (old_item->seqnum + old_item->count) & 0xffff;
free_item (old_item); rtp_jitter_buffer_free_item (old_item);
} }
/* we might have removed some head buffers, signal the pushing thread to /* we might have removed some head buffers, signal the pushing thread to
* see if it can push now */ * see if it can push now */
@ -3220,11 +3198,12 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent,
* any of those is valid in the buffer, so we know that if we estimated the * any of those is valid in the buffer, so we know that if we estimated the
* dts that both are unknown */ * dts that both are unknown */
if (estimated_dts) if (estimated_dts)
item = item = rtp_jitter_buffer_alloc_item (buffer, ITEM_TYPE_BUFFER,
alloc_item (buffer, ITEM_TYPE_BUFFER, GST_CLOCK_TIME_NONE, GST_CLOCK_TIME_NONE, pts, seqnum, 1, rtptime,
pts, seqnum, 1, rtptime); (GDestroyNotify) gst_mini_object_unref);
else else
item = alloc_item (buffer, ITEM_TYPE_BUFFER, dts, pts, seqnum, 1, rtptime); item = rtp_jitter_buffer_alloc_item (buffer, ITEM_TYPE_BUFFER, dts, pts,
seqnum, 1, rtptime, (GDestroyNotify) gst_mini_object_unref);
/* now insert the packet into the queue in sorted order. This function returns /* now insert the packet into the queue in sorted order. This function returns
* FALSE if a packet with the same seqnum was already in the queue, meaning we * FALSE if a packet with the same seqnum was already in the queue, meaning we
@ -3326,7 +3305,7 @@ duplicate:
GST_DEBUG_OBJECT (jitterbuffer, "Duplicate packet #%d detected, dropping", GST_DEBUG_OBJECT (jitterbuffer, "Duplicate packet #%d detected, dropping",
seqnum); seqnum);
priv->num_duplicates++; priv->num_duplicates++;
free_item (item); rtp_jitter_buffer_free_item (item);
goto finished; goto finished;
} }
rtx_duplicate: rtx_duplicate:
@ -3565,7 +3544,7 @@ pop_and_push_next (GstRtpJitterBuffer * jitterbuffer, guint seqnum)
JBUF_UNLOCK (priv); JBUF_UNLOCK (priv);
item->data = NULL; item->data = NULL;
free_item (item); rtp_jitter_buffer_free_item (item);
if (msg) if (msg)
gst_element_post_message (GST_ELEMENT_CAST (jitterbuffer), msg); gst_element_post_message (GST_ELEMENT_CAST (jitterbuffer), msg);
@ -3685,7 +3664,7 @@ handle_next_buffer (GstRtpJitterBuffer * jitterbuffer)
GST_DEBUG_OBJECT (jitterbuffer, "Old packet #%d, next #%d dropping", GST_DEBUG_OBJECT (jitterbuffer, "Old packet #%d, next #%d dropping",
seqnum, next_seqnum); seqnum, next_seqnum);
item = rtp_jitter_buffer_pop (priv->jbuf, NULL); item = rtp_jitter_buffer_pop (priv->jbuf, NULL);
free_item (item); rtp_jitter_buffer_free_item (item);
result = GST_FLOW_OK; result = GST_FLOW_OK;
} else { } else {
/* the chain function has scheduled timers to request retransmission or /* the chain function has scheduled timers to request retransmission or
@ -3968,10 +3947,11 @@ do_lost_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
"duration", G_TYPE_UINT64, duration, "duration", G_TYPE_UINT64, duration,
"retry", G_TYPE_UINT, num_rtx_retry, NULL)); "retry", G_TYPE_UINT, num_rtx_retry, NULL));
} }
item = alloc_item (event, ITEM_TYPE_LOST, -1, -1, seqnum, lost_packets, -1); item = rtp_jitter_buffer_alloc_item (event, ITEM_TYPE_LOST, -1, -1, seqnum,
lost_packets, -1, (GDestroyNotify) gst_mini_object_unref);
if (!rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL)) if (!rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL))
/* Duplicate */ /* Duplicate */
free_item (item); rtp_jitter_buffer_free_item (item);
if (GST_CLOCK_TIME_IS_VALID (timer->rtx_last)) { if (GST_CLOCK_TIME_IS_VALID (timer->rtx_last)) {
/* Store info to update stats if the packet arrives too late */ /* Store info to update stats if the packet arrives too late */
@ -4478,7 +4458,8 @@ gst_rtp_jitter_buffer_sink_query (GstPad * pad, GstObject * parent,
if (rtp_jitter_buffer_get_mode (priv->jbuf) != if (rtp_jitter_buffer_get_mode (priv->jbuf) !=
RTP_JITTER_BUFFER_MODE_BUFFER) { RTP_JITTER_BUFFER_MODE_BUFFER) {
GST_DEBUG_OBJECT (jitterbuffer, "adding serialized query"); GST_DEBUG_OBJECT (jitterbuffer, "adding serialized query");
item = alloc_item (query, ITEM_TYPE_QUERY, -1, -1, -1, 0, -1); item = rtp_jitter_buffer_alloc_item (query, ITEM_TYPE_QUERY, -1, -1,
-1, 0, -1, NULL);
rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL); rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL);
if (head) if (head)
JBUF_SIGNAL_EVENT (priv); JBUF_SIGNAL_EVENT (priv);

View file

@ -1108,6 +1108,10 @@ rtp_jitter_buffer_pop (RTPJitterBuffer * jbuf, gint * percent)
else if (percent) else if (percent)
*percent = -1; *percent = -1;
/* let's clear the pointers so we can ensure we don't free items that are
* still in the jitterbuffer */
item->next = item->prev = NULL;
return (RTPJitterBufferItem *) item; return (RTPJitterBufferItem *) item;
} }
@ -1133,7 +1137,7 @@ rtp_jitter_buffer_peek (RTPJitterBuffer * jbuf)
/** /**
* rtp_jitter_buffer_flush: * rtp_jitter_buffer_flush:
* @jbuf: an #RTPJitterBuffer * @jbuf: an #RTPJitterBuffer
* @free_func: function to free each item * @free_func: function to free each item (optional)
* @user_data: user data passed to @free_func * @user_data: user data passed to @free_func
* *
* Flush all packets from the jitterbuffer. * Flush all packets from the jitterbuffer.
@ -1145,7 +1149,9 @@ rtp_jitter_buffer_flush (RTPJitterBuffer * jbuf, GFunc free_func,
GList *item; GList *item;
g_return_if_fail (jbuf != NULL); g_return_if_fail (jbuf != NULL);
g_return_if_fail (free_func != NULL);
if (free_func == NULL)
free_func = (GFunc) rtp_jitter_buffer_free_item;
while ((item = g_queue_pop_head_link (jbuf->packets))) while ((item = g_queue_pop_head_link (jbuf->packets)))
free_func ((RTPJitterBufferItem *) item, user_data); free_func ((RTPJitterBufferItem *) item, user_data);
@ -1376,3 +1382,59 @@ rtp_jitter_buffer_is_full (RTPJitterBuffer * jbuf)
return rtp_jitter_buffer_get_seqnum_diff (jbuf) >= 32765 && return rtp_jitter_buffer_get_seqnum_diff (jbuf) >= 32765 &&
rtp_jitter_buffer_num_packets (jbuf) > 10000; rtp_jitter_buffer_num_packets (jbuf) > 10000;
} }
/**
* rtp_jitter_buffer_alloc_item:
* @data: The data stored in this item
* @type: User specific item type
* @dts: Decoding Timestamp
* @pts: Presentation Timestamp
* @seqnum: Sequence number
* @count: Number of packet this item represent
* @rtptime: The RTP specific timestamp
* @free_data: A function to free @data (optional)
*
* Create an item that can then be stored in the jitter buffer.
*
* Returns: a newly allocated RTPJitterbufferItem
*/
RTPJitterBufferItem *
rtp_jitter_buffer_alloc_item (gpointer data, guint type, GstClockTime dts,
GstClockTime pts, guint seqnum, guint count, guint rtptime,
GDestroyNotify free_data)
{
RTPJitterBufferItem *item;
item = g_slice_new (RTPJitterBufferItem);
item->data = data;
item->next = NULL;
item->prev = NULL;
item->type = type;
item->dts = dts;
item->pts = pts;
item->seqnum = seqnum;
item->count = count;
item->rtptime = rtptime;
item->free_data = free_data;
return item;
}
/**
* rtp_jitter_buffer_free_item:
* @item: the item to be freed
*
* Free the jitter buffer item.
*/
void
rtp_jitter_buffer_free_item (RTPJitterBufferItem * item)
{
g_return_if_fail (item != NULL);
/* needs to be unlinked first */
g_return_if_fail (item->next == NULL);
g_return_if_fail (item->prev == NULL);
if (item->data && item->free_data)
item->free_data (item->data);
g_slice_free (RTPJitterBufferItem, item);
}

View file

@ -129,19 +129,27 @@ struct _RTPJitterBufferClass {
* append. * append.
* @count: amount of seqnum in this item * @count: amount of seqnum in this item
* @rtptime: rtp timestamp * @rtptime: rtp timestamp
* @data_free: Function to free @data (optional)
* *
* An object containing an RTP packet or event. * An object containing an RTP packet or event. First members of this structure
* copied from GList so they can be inserted into lists without doing more
* allocations.
*/ */
struct _RTPJitterBufferItem { struct _RTPJitterBufferItem {
/* a GList */
gpointer data; gpointer data;
GList *next; GList *next;
GList *prev; GList *prev;
/* item metadata */
guint type; guint type;
GstClockTime dts; GstClockTime dts;
GstClockTime pts; GstClockTime pts;
guint seqnum; guint seqnum;
guint count; guint count;
guint rtptime; guint rtptime;
GDestroyNotify free_data;
}; };
GType rtp_jitter_buffer_get_type (void); GType rtp_jitter_buffer_get_type (void);
@ -197,5 +205,9 @@ gboolean rtp_jitter_buffer_can_fast_start (RTPJitterBuffer * jbuf
gboolean rtp_jitter_buffer_is_full (RTPJitterBuffer * jbuf); gboolean rtp_jitter_buffer_is_full (RTPJitterBuffer * jbuf);
RTPJitterBufferItem * rtp_jitter_buffer_alloc_item (gpointer data, guint type, GstClockTime dts,
GstClockTime pts, guint seqnum, guint count,
guint rtptime, GDestroyNotify free_data);
void rtp_jitter_buffer_free_item (RTPJitterBufferItem * item);
#endif /* __RTP_JITTER_BUFFER_H__ */ #endif /* __RTP_JITTER_BUFFER_H__ */