rtpjitterbuffer: create specific API for appending buffers, events etc

To avoid specifying a bunch of mystic variables.
This commit is contained in:
Havard Graff 2020-03-30 22:26:33 +02:00 committed by GStreamer Merge Bot
parent 9f1062dc05
commit 3368ed44a3
3 changed files with 183 additions and 104 deletions

View file

@ -1067,20 +1067,6 @@ gst_rtp_jitter_buffer_init (GstRtpJitterBuffer * jitterbuffer)
GST_OBJECT_FLAG_SET (jitterbuffer, GST_ELEMENT_FLAG_PROVIDE_CLOCK);
}
#define IS_DROPABLE(it) (((it)->type == ITEM_TYPE_BUFFER) || ((it)->type == ITEM_TYPE_LOST))
#define ITEM_TYPE_BUFFER 0
#define ITEM_TYPE_LOST 1
#define ITEM_TYPE_EVENT 2
#define ITEM_TYPE_QUERY 3
static inline RTPJitterBufferItem *
alloc_event_item (GstEvent * event)
{
return rtp_jitter_buffer_alloc_item (event, ITEM_TYPE_EVENT, -1, -1, -1, 0,
-1, (GDestroyNotify) gst_mini_object_unref);
}
static void
free_item_and_retain_sticky_events (RTPJitterBufferItem * item,
gpointer user_data)
@ -1792,7 +1778,6 @@ static gboolean
queue_event (GstRtpJitterBuffer * jitterbuffer, GstEvent * event)
{
GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
RTPJitterBufferItem *item;
gboolean head;
switch (GST_EVENT_TYPE (event)) {
@ -1832,10 +1817,8 @@ queue_event (GstRtpJitterBuffer * jitterbuffer, GstEvent * event)
break;
}
GST_DEBUG_OBJECT (jitterbuffer, "adding event");
item = alloc_event_item (event);
rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL);
head = rtp_jitter_buffer_append_event (priv->jbuf, event);
if (head || priv->eos)
JBUF_SIGNAL_EVENT (priv);
@ -2388,9 +2371,7 @@ insert_lost_event (GstRtpJitterBuffer * jitterbuffer,
{
GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
GstEvent *event = NULL;
RTPJitterBufferItem *item;
guint next_in_seqnum;
gboolean head;
/* we had a gap and thus we lost some packets. Create an event for this. */
if (lost_packets > 1)
@ -2424,13 +2405,8 @@ insert_lost_event (GstRtpJitterBuffer * jitterbuffer,
"duration", G_TYPE_UINT64, duration,
"retry", G_TYPE_UINT, num_rtx_retry, NULL));
}
item = rtp_jitter_buffer_alloc_item (event, ITEM_TYPE_LOST, -1, -1, seqnum,
lost_packets, -1, (GDestroyNotify) gst_mini_object_unref);
if (!rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL))
/* Duplicate */
rtp_jitter_buffer_free_item (item);
if (head)
if (rtp_jitter_buffer_append_lost_event (priv->jbuf,
event, seqnum, lost_packets))
JBUF_SIGNAL_EVENT (priv);
}
@ -2747,7 +2723,6 @@ gst_rtp_jitter_buffer_reset (GstRtpJitterBuffer * jitterbuffer,
GstFlowReturn ret = GST_FLOW_OK;
GList *events = NULL, *l;
GList *buffers;
gboolean head;
GST_DEBUG_OBJECT (jitterbuffer, "flush and reset jitterbuffer");
rtp_jitter_buffer_flush (priv->jbuf,
@ -2776,10 +2751,7 @@ gst_rtp_jitter_buffer_reset (GstRtpJitterBuffer * jitterbuffer,
*/
events = g_list_reverse (events);
for (l = events; l; l = l->next) {
RTPJitterBufferItem *item;
item = alloc_event_item (l->data);
rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL);
rtp_jitter_buffer_append_event (priv->jbuf, l->data);
}
g_list_free (events);
@ -2855,16 +2827,17 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent,
GstClockTime dts, pts;
guint64 latency_ts;
gboolean head;
gboolean duplicate;
gint percent = -1;
guint8 pt;
GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
gboolean do_next_seqnum = FALSE;
RTPJitterBufferItem *item;
GstMessage *msg = NULL;
GstMessage *drop_msg = NULL;
gboolean estimated_dts = FALSE;
gint32 packet_rate, max_dropout, max_misorder;
RtpTimer *timer = NULL;
gboolean is_rtx;
jitterbuffer = GST_RTP_JITTER_BUFFER_CAST (parent);
@ -2878,6 +2851,8 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent,
rtptime = gst_rtp_buffer_get_timestamp (&rtp);
gst_rtp_buffer_unmap (&rtp);
is_rtx = GST_BUFFER_IS_RETRANSMISSION (buffer);
/* make sure we have PTS and DTS set */
pts = GST_BUFFER_PTS (buffer);
dts = GST_BUFFER_DTS (buffer);
@ -2909,8 +2884,7 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent,
GST_DEBUG_OBJECT (jitterbuffer,
"Received packet #%d at time %" GST_TIME_FORMAT ", discont %d, rtx %d",
seqnum, GST_TIME_ARGS (dts), GST_BUFFER_IS_DISCONT (buffer),
GST_BUFFER_IS_RETRANSMISSION (buffer));
seqnum, GST_TIME_ARGS (dts), GST_BUFFER_IS_DISCONT (buffer), is_rtx);
JBUF_LOCK_CHECK (priv, out_flushing);
@ -2948,7 +2922,7 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent,
if (G_UNLIKELY (priv->eos))
goto have_eos;
if (!GST_BUFFER_IS_RETRANSMISSION (buffer))
if (!is_rtx)
calculate_jitter (jitterbuffer, dts, rtptime);
if (priv->seqnum_base != -1) {
@ -2985,7 +2959,7 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent,
max_dropout, max_misorder);
timer = rtp_timer_queue_find (priv->timers, seqnum);
if (GST_BUFFER_IS_RETRANSMISSION (buffer)) {
if (is_rtx) {
if (G_UNLIKELY (!priv->do_retransmission))
goto unsolicited_rtx;
@ -3069,7 +3043,7 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent,
pts =
rtp_jitter_buffer_calculate_pts (priv->jbuf, dts, estimated_dts,
rtptime, gst_element_get_base_time (GST_ELEMENT_CAST (jitterbuffer)),
gap, GST_BUFFER_IS_RETRANSMISSION (buffer));
gap, is_rtx);
if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (pts))) {
/* A valid timestamp cannot be calculated, discard packet */
@ -3104,7 +3078,7 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent,
priv->next_in_seqnum = (seqnum + 1) & 0xffff;
}
if (GST_BUFFER_IS_RETRANSMISSION (buffer))
if (is_rtx)
timer->num_rtx_received++;
/* At 2^15, we would detect a seqnum rollover too early, therefore
@ -3137,7 +3111,7 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent,
/* priv->last_popped_seqnum >= seqnum, we're too late. */
if (G_UNLIKELY (gap <= 0)) {
if (priv->do_retransmission) {
if (GST_BUFFER_IS_RETRANSMISSION (buffer) && timer) {
if (is_rtx && timer) {
update_rtx_stats (jitterbuffer, timer, dts, FALSE);
/* Only count the retranmitted packet too late if it has been
* considered lost. If the original packet arrived before the
@ -3185,20 +3159,15 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent,
* later. The code above always sets dts to pts or the other way around if
* any of those is valid in the buffer, so we know that if we estimated the
* dts that both are unknown */
if (estimated_dts)
item = rtp_jitter_buffer_alloc_item (buffer, ITEM_TYPE_BUFFER,
GST_CLOCK_TIME_NONE, pts, seqnum, 1, rtptime,
(GDestroyNotify) gst_mini_object_unref);
else
item = rtp_jitter_buffer_alloc_item (buffer, ITEM_TYPE_BUFFER, dts, pts,
seqnum, 1, rtptime, (GDestroyNotify) gst_mini_object_unref);
head = rtp_jitter_buffer_append_buffer (priv->jbuf, buffer,
estimated_dts ? GST_CLOCK_TIME_NONE : dts, pts, seqnum, rtptime,
&duplicate, &percent);
/* now insert the packet into the queue in sorted order. This function returns
* FALSE if a packet with the same seqnum was already in the queue, meaning we
* have a duplicate. */
if (G_UNLIKELY (!rtp_jitter_buffer_insert (priv->jbuf, item, &head,
&percent))) {
if (GST_BUFFER_IS_RETRANSMISSION (buffer) && timer)
if (G_UNLIKELY (duplicate)) {
if (is_rtx && timer)
update_rtx_stats (jitterbuffer, timer, dts, FALSE);
goto duplicate;
}
@ -3208,8 +3177,7 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent,
head = TRUE;
/* update timers */
update_timers (jitterbuffer, seqnum, dts, pts, do_next_seqnum,
GST_BUFFER_IS_RETRANSMISSION (buffer), timer);
update_timers (jitterbuffer, seqnum, dts, pts, do_next_seqnum, is_rtx, timer);
/* we had an unhandled SR, handle it now */
if (priv->last_sr)
@ -3284,7 +3252,6 @@ duplicate:
GST_DEBUG_OBJECT (jitterbuffer, "Duplicate packet #%d detected, dropping",
seqnum);
priv->num_duplicates++;
rtp_jitter_buffer_free_item (item);
goto finished;
}
rtx_duplicate:
@ -3306,7 +3273,7 @@ discard_invalid:
{
GST_DEBUG_OBJECT (jitterbuffer,
"cannot calculate a valid pts for #%d (rtx: %d), discard",
seqnum, GST_BUFFER_IS_RETRANSMISSION (buffer));
seqnum, is_rtx);
gst_buffer_unref (buffer);
goto finished;
}
@ -4366,17 +4333,11 @@ gst_rtp_jitter_buffer_sink_query (GstPad * pad, GstObject * parent,
}
default:
if (GST_QUERY_IS_SERIALIZED (query)) {
RTPJitterBufferItem *item;
gboolean head;
JBUF_LOCK_CHECK (priv, out_flushing);
if (rtp_jitter_buffer_get_mode (priv->jbuf) !=
RTP_JITTER_BUFFER_MODE_BUFFER) {
GST_DEBUG_OBJECT (jitterbuffer, "adding serialized query");
item = rtp_jitter_buffer_alloc_item (query, ITEM_TYPE_QUERY, -1, -1,
-1, 0, -1, NULL);
rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL);
if (head)
if (rtp_jitter_buffer_append_query (priv->jbuf, query))
JBUF_SIGNAL_EVENT (priv);
JBUF_WAIT_QUERY (priv, out_flushing);
res = priv->last_query;

View file

@ -989,7 +989,7 @@ done:
*
* Returns: %FALSE if a packet with the same number already existed.
*/
gboolean
static gboolean
rtp_jitter_buffer_insert (RTPJitterBuffer * jbuf, RTPJitterBufferItem * item,
gboolean * head, gint * percent)
{
@ -1067,10 +1067,156 @@ duplicate:
GST_DEBUG ("duplicate packet %d found", (gint) seqnum);
if (G_LIKELY (head))
*head = FALSE;
if (percent)
*percent = -1;
return FALSE;
}
}
/**
* rtp_jitter_buffer_alloc_item:
* @data: The data stored in this item
* @type: User specific item type
* @dts: Decoding Timestamp
* @pts: Presentation Timestamp
* @seqnum: Sequence number
* @count: Number of packet this item represent
* @rtptime: The RTP specific timestamp
* @free_data: A function to free @data (optional)
*
* Create an item that can then be stored in the jitter buffer.
*
* Returns: a newly allocated RTPJitterbufferItem
*/
static RTPJitterBufferItem *
rtp_jitter_buffer_alloc_item (gpointer data, guint type, GstClockTime dts,
GstClockTime pts, guint seqnum, guint count, guint rtptime,
GDestroyNotify free_data)
{
RTPJitterBufferItem *item;
item = g_slice_new (RTPJitterBufferItem);
item->data = data;
item->next = NULL;
item->prev = NULL;
item->type = type;
item->dts = dts;
item->pts = pts;
item->seqnum = seqnum;
item->count = count;
item->rtptime = rtptime;
item->free_data = free_data;
return item;
}
static inline RTPJitterBufferItem *
alloc_event_item (GstEvent * event)
{
return rtp_jitter_buffer_alloc_item (event, ITEM_TYPE_EVENT, -1, -1, -1, 0,
-1, (GDestroyNotify) gst_mini_object_unref);
}
/**
* rtp_jitter_buffer_append_event:
* @jbuf: an #RTPJitterBuffer
* @event: an #GstEvent to insert
* Inserts @event into the packet queue of @jbuf.
*
* Returns: %TRUE if the event is at the head of the queue
*/
gboolean
rtp_jitter_buffer_append_event (RTPJitterBuffer * jbuf, GstEvent * event)
{
RTPJitterBufferItem *item = alloc_event_item (event);
gboolean head;
rtp_jitter_buffer_insert (jbuf, item, &head, NULL);
return head;
}
/**
* rtp_jitter_buffer_append_query:
* @jbuf: an #RTPJitterBuffer
* @query: an #GstQuery to insert
* Inserts @query into the packet queue of @jbuf.
*
* Returns: %TRUE if the query is at the head of the queue
*/
gboolean
rtp_jitter_buffer_append_query (RTPJitterBuffer * jbuf, GstQuery * query)
{
RTPJitterBufferItem *item =
rtp_jitter_buffer_alloc_item (query, ITEM_TYPE_QUERY, -1, -1, -1, 0, -1,
NULL);
gboolean head;
rtp_jitter_buffer_insert (jbuf, item, &head, NULL);
return head;
}
/**
* rtp_jitter_buffer_append_lost_event:
* @jbuf: an #RTPJitterBuffer
* @event: an #GstEvent to insert
* @seqnum: Sequence number
* @lost_packets: Number of lost packet this item represent
* Inserts @event into the packet queue of @jbuf.
*
* Returns: %TRUE if the event is at the head of the queue
*/
gboolean
rtp_jitter_buffer_append_lost_event (RTPJitterBuffer * jbuf, GstEvent * event,
guint16 seqnum, guint lost_packets)
{
RTPJitterBufferItem *item = rtp_jitter_buffer_alloc_item (event,
ITEM_TYPE_LOST, -1, -1, seqnum, lost_packets, -1,
(GDestroyNotify) gst_mini_object_unref);
gboolean head;
if (!rtp_jitter_buffer_insert (jbuf, item, &head, NULL)) {
/* Duplicate */
rtp_jitter_buffer_free_item (item);
head = FALSE;
}
return head;
}
/**
* rtp_jitter_buffer_append_buffer:
* @jbuf: an #RTPJitterBuffer
* @buf: an #GstBuffer to insert
* @seqnum: Sequence number
* @duplicate: TRUE when the packet inserted is a duplicate
* @percent: the buffering percent after insertion
*
* Inserts @buf into the packet queue of @jbuf.
*
* Returns: %TRUE if the buffer is at the head of the queue
*/
gboolean
rtp_jitter_buffer_append_buffer (RTPJitterBuffer * jbuf, GstBuffer * buf,
GstClockTime dts, GstClockTime pts, guint16 seqnum, guint rtptime,
gboolean * duplicate, gint * percent)
{
RTPJitterBufferItem *item = rtp_jitter_buffer_alloc_item (buf,
ITEM_TYPE_BUFFER, dts, pts, seqnum, 1, rtptime,
(GDestroyNotify) gst_mini_object_unref);
gboolean head;
gboolean inserted;
inserted = rtp_jitter_buffer_insert (jbuf, item, &head, percent);
if (!inserted)
rtp_jitter_buffer_free_item (item);
if (duplicate)
*duplicate = !inserted;
return head;
}
/**
* rtp_jitter_buffer_pop:
* @jbuf: an #RTPJitterBuffer
@ -1383,42 +1529,6 @@ rtp_jitter_buffer_is_full (RTPJitterBuffer * jbuf)
rtp_jitter_buffer_num_packets (jbuf) > 10000;
}
/**
* rtp_jitter_buffer_alloc_item:
* @data: The data stored in this item
* @type: User specific item type
* @dts: Decoding Timestamp
* @pts: Presentation Timestamp
* @seqnum: Sequence number
* @count: Number of packet this item represent
* @rtptime: The RTP specific timestamp
* @free_data: A function to free @data (optional)
*
* Create an item that can then be stored in the jitter buffer.
*
* Returns: a newly allocated RTPJitterbufferItem
*/
RTPJitterBufferItem *
rtp_jitter_buffer_alloc_item (gpointer data, guint type, GstClockTime dts,
GstClockTime pts, guint seqnum, guint count, guint rtptime,
GDestroyNotify free_data)
{
RTPJitterBufferItem *item;
item = g_slice_new (RTPJitterBufferItem);
item->data = data;
item->next = NULL;
item->prev = NULL;
item->type = type;
item->dts = dts;
item->pts = pts;
item->seqnum = seqnum;
item->count = count;
item->rtptime = rtptime;
item->free_data = free_data;
return item;
}
/**
* rtp_jitter_buffer_free_item:

View file

@ -116,6 +116,12 @@ struct _RTPJitterBufferClass {
GObjectClass parent_class;
};
#define IS_DROPABLE(it) (((it)->type == ITEM_TYPE_BUFFER) || ((it)->type == ITEM_TYPE_LOST))
#define ITEM_TYPE_BUFFER 0
#define ITEM_TYPE_LOST 1
#define ITEM_TYPE_EVENT 2
#define ITEM_TYPE_QUERY 3
/**
* RTPJitterBufferItem:
* @data: the data of the item
@ -174,9 +180,14 @@ void rtp_jitter_buffer_set_rfc7273_sync (RTPJitterBuffer *jbuf,
void rtp_jitter_buffer_reset_skew (RTPJitterBuffer *jbuf);
gboolean rtp_jitter_buffer_insert (RTPJitterBuffer *jbuf,
RTPJitterBufferItem *item,
gboolean *head, gint *percent);
gboolean rtp_jitter_buffer_append_event (RTPJitterBuffer * jbuf, GstEvent * event);
gboolean rtp_jitter_buffer_append_query (RTPJitterBuffer * jbuf, GstQuery * query);
gboolean rtp_jitter_buffer_append_lost_event (RTPJitterBuffer * jbuf, GstEvent * event,
guint16 seqnum, guint lost_packets);
gboolean rtp_jitter_buffer_append_buffer (RTPJitterBuffer * jbuf, GstBuffer * buf,
GstClockTime dts, GstClockTime pts,
guint16 seqnum, guint rtptime,
gboolean * duplicate, gint * percent);
void rtp_jitter_buffer_disable_buffering (RTPJitterBuffer *jbuf, gboolean disabled);
@ -205,9 +216,6 @@ gboolean rtp_jitter_buffer_can_fast_start (RTPJitterBuffer * jbuf
gboolean rtp_jitter_buffer_is_full (RTPJitterBuffer * jbuf);
RTPJitterBufferItem * rtp_jitter_buffer_alloc_item (gpointer data, guint type, GstClockTime dts,
GstClockTime pts, guint seqnum, guint count,
guint rtptime, GDestroyNotify free_data);
void rtp_jitter_buffer_free_item (RTPJitterBufferItem * item);
#endif /* __RTP_JITTER_BUFFER_H__ */