gstrtpjitterbuffer: Custom messages when dropping packets

This commit adds custom element messages for when gstrtpjitterbuffer
drops an incoming rtp packets due to for example arriving too late.
Applications can listen to these messages on the bus which enables
actions to be taken when packets are dropped due to for example high
network jitter.

Two properties has been added, one to enable posting drop messages and
one to set a minimum time between each message to enable throttling the
posting of messages as high drop rates.
This commit is contained in:
Simon Arnling Bååth 2019-10-04 20:31:56 +00:00 committed by Mathieu Duponchelle
parent a55576d1fd
commit 8173596ed2
2 changed files with 455 additions and 1 deletions

View file

@ -134,6 +134,8 @@ enum
#define DEFAULT_TS_OFFSET 0
#define DEFAULT_MAX_TS_OFFSET_ADJUSTMENT 0
#define DEFAULT_DO_LOST FALSE
#define DEFAULT_POST_DROP_MESSAGES FALSE
#define DEFAULT_DROP_MESSAGES_INTERVAL_MS 200
#define DEFAULT_MODE RTP_JITTER_BUFFER_MODE_SLAVE
#define DEFAULT_PERCENT 0
#define DEFAULT_DO_RETRANSMISSION FALSE
@ -164,6 +166,8 @@ enum
PROP_TS_OFFSET,
PROP_MAX_TS_OFFSET_ADJUSTMENT,
PROP_DO_LOST,
PROP_POST_DROP_MESSAGES,
PROP_DROP_MESSAGES_INTERVAL,
PROP_MODE,
PROP_PERCENT,
PROP_DO_RETRANSMISSION,
@ -297,6 +301,8 @@ struct _GstRtpJitterBufferPrivate
gint64 ts_offset;
guint64 max_ts_offset_adjustment;
gboolean do_lost;
gboolean post_drop_messages;
guint drop_messages_interval_ms;
gboolean do_retransmission;
gboolean rtx_next_seqnum;
gint rtx_delay;
@ -388,7 +394,20 @@ struct _GstRtpJitterBufferPrivate
GstClockTime last_pts;
guint64 last_rtptime;
GstClockTime avg_jitter;
/* for dropped packet messages */
GstClockTime last_drop_msg_timestamp;
/* accumulators; reset every time a drop message is posted */
guint num_too_late;
guint num_already_lost;
guint num_drop_on_latency;
};
typedef enum
{
REASON_TOO_LATE,
REASON_ALREADY_LOST,
REASON_DROP_ON_LATENCY
} DropMessageReason;
static GstStaticPadTemplate gst_rtp_jitter_buffer_sink_template =
GST_STATIC_PAD_TEMPLATE ("sink",
@ -490,6 +509,9 @@ static GstStructure *gst_rtp_jitter_buffer_create_stats (GstRtpJitterBuffer *
static void update_rtx_stats (GstRtpJitterBuffer * jitterbuffer,
const RtpTimer * timer, GstClockTime dts, gboolean success);
static GstClockTime get_current_running_time (GstRtpJitterBuffer *
jitterbuffer);
static void
gst_rtp_jitter_buffer_class_init (GstRtpJitterBufferClass * klass)
{
@ -561,6 +583,49 @@ gst_rtp_jitter_buffer_class_init (GstRtpJitterBufferClass * klass)
"Send an event downstream when a packet is lost", DEFAULT_DO_LOST,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/**
* GstRtpJitterBuffer:post-drop-messages:
*
* Post custom messages to the bus when a packet is dropped by the
* jitterbuffer due to arriving too late, being already considered lost,
* or being dropped due to the drop-on-latency property being enabled.
* Message is of type GST_MESSAGE_ELEMENT and contains a GstStructure named
* "drop-msg" with the following fields:
*
* * #guint `seqnum`: Seqnum of dropped packet.
* * #guint64 `timestamp`: PTS timestamp of dropped packet.
* * #gstring `reason`: Reason for dropping the packet.
* * #guint `num-too-late`: Number of packets arriving too late since
* last drop message.
* * #guint `num-already-lost`: Number of packets already considered lost
* since drop message.
* * #guint `num-drop-on-latency`: Number of packets dropped due to the
* drop-on-latency property since last drop message.
*
* Since: 1.18
*/
g_object_class_install_property (gobject_class, PROP_POST_DROP_MESSAGES,
g_param_spec_boolean ("post-drop-messages", "Post drop messages",
"Post a custom message to the bus when a packet is dropped by the jitterbuffer",
DEFAULT_POST_DROP_MESSAGES,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/**
* GstRtpJitterBuffer:drop-messages-interval:
*
* Minimal time in milliseconds between posting dropped packet messages, if enabled
* by setting property by setting #GstRtpJitterBuffer:post-drop-messages to %TRUE.
* If interval is set to 0, every dropped packet will result in a drop message being posted.
*
* Since: 1.18
*/
g_object_class_install_property (gobject_class, PROP_DROP_MESSAGES_INTERVAL,
g_param_spec_uint ("drop-messages-interval",
"Drop message interval",
"Minimal time between posting dropped packet messages", 0,
G_MAXUINT, DEFAULT_DROP_MESSAGES_INTERVAL_MS,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/**
* GstRtpJitterBuffer:mode:
*
@ -935,6 +1000,8 @@ gst_rtp_jitter_buffer_init (GstRtpJitterBuffer * jitterbuffer)
priv->ts_offset = DEFAULT_TS_OFFSET;
priv->max_ts_offset_adjustment = DEFAULT_MAX_TS_OFFSET_ADJUSTMENT;
priv->do_lost = DEFAULT_DO_LOST;
priv->post_drop_messages = DEFAULT_POST_DROP_MESSAGES;
priv->drop_messages_interval_ms = DEFAULT_DROP_MESSAGES_INTERVAL_MS;
priv->do_retransmission = DEFAULT_DO_RETRANSMISSION;
priv->rtx_next_seqnum = DEFAULT_RTX_NEXT_SEQNUM;
priv->rtx_delay = DEFAULT_RTX_DELAY;
@ -956,6 +1023,10 @@ gst_rtp_jitter_buffer_init (GstRtpJitterBuffer * jitterbuffer)
priv->last_pts = -1;
priv->last_rtptime = -1;
priv->avg_jitter = 0;
priv->last_drop_msg_timestamp = GST_CLOCK_TIME_NONE;
priv->num_too_late = 0;
priv->num_already_lost = 0;
priv->num_drop_on_latency = 0;
priv->segment_seqnum = GST_SEQNUM_INVALID;
priv->timers = rtp_timer_queue_new ();
priv->rtx_stats_timers = rtp_timer_queue_new ();
@ -1530,6 +1601,10 @@ gst_rtp_jitter_buffer_flush_stop (GstRtpJitterBuffer * jitterbuffer)
priv->last_in_pts = 0;
priv->equidistant = 0;
priv->segment_seqnum = GST_SEQNUM_INVALID;
priv->last_drop_msg_timestamp = GST_CLOCK_TIME_NONE;
priv->num_too_late = 0;
priv->num_already_lost = 0;
priv->num_drop_on_latency = 0;
GST_DEBUG_OBJECT (jitterbuffer, "flush and reset jitterbuffer");
rtp_jitter_buffer_flush (priv->jbuf, NULL, NULL);
rtp_jitter_buffer_disable_buffering (priv->jbuf, FALSE);
@ -1937,6 +2012,59 @@ check_buffering_percent (GstRtpJitterBuffer * jitterbuffer, gint percent)
return message;
}
/* call with jbuf lock held */
static GstMessage *
new_drop_message (GstRtpJitterBuffer * jitterbuffer, guint seqnum,
GstClockTime timestamp, DropMessageReason reason)
{
GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
GstMessage *drop_msg = NULL;
GstStructure *s;
GstClockTime current_time;
GstClockTime time_diff;
const gchar *reason_str;
current_time = get_current_running_time (jitterbuffer);
time_diff = current_time - priv->last_drop_msg_timestamp;
if (reason == REASON_TOO_LATE) {
priv->num_too_late++;
reason_str = "too-late";
} else if (reason == REASON_ALREADY_LOST) {
priv->num_already_lost++;
reason_str = "already-lost";
} else if (reason == REASON_DROP_ON_LATENCY) {
priv->num_drop_on_latency++;
reason_str = "drop-on-latency";
} else {
GST_WARNING_OBJECT (jitterbuffer, "Invalid reason for drop message");
return drop_msg;
}
/* Only create new drop_msg if time since last drop_msg is larger that
* that the set interval, or if it is the first drop message posted */
if ((time_diff >= priv->drop_messages_interval_ms * GST_MSECOND) ||
(priv->last_drop_msg_timestamp == GST_CLOCK_TIME_NONE)) {
s = gst_structure_new ("drop-msg",
"seqnum", G_TYPE_UINT, seqnum,
"timestamp", GST_TYPE_CLOCK_TIME, timestamp,
"reason", G_TYPE_STRING, reason_str,
"num-too-late", G_TYPE_UINT, priv->num_too_late,
"num-already-lost", G_TYPE_UINT, priv->num_already_lost,
"num-drop-on-latency", G_TYPE_UINT, priv->num_drop_on_latency, NULL);
priv->last_drop_msg_timestamp = current_time;
priv->num_too_late = 0;
priv->num_already_lost = 0;
priv->num_drop_on_latency = 0;
drop_msg = gst_message_new_element (GST_OBJECT (jitterbuffer), s);
}
return drop_msg;
}
static inline GstClockTimeDiff
timeout_offset (GstRtpJitterBuffer * jitterbuffer)
{
@ -2689,6 +2817,7 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent,
gboolean do_next_seqnum = FALSE;
RTPJitterBufferItem *item;
GstMessage *msg = NULL;
GstMessage *drop_msg = NULL;
gboolean estimated_dts = FALSE;
gint32 packet_rate, max_dropout, max_misorder;
RtpTimer *timer = NULL;
@ -2997,6 +3126,11 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent,
GST_DEBUG_OBJECT (jitterbuffer, "Queue full, dropping old packet %p",
old_item);
priv->next_seqnum = (old_item->seqnum + old_item->count) & 0xffff;
if (priv->post_drop_messages) {
drop_msg =
new_drop_message (jitterbuffer, old_item->seqnum, old_item->pts,
REASON_DROP_ON_LATENCY);
}
rtp_jitter_buffer_free_item (old_item);
}
/* we might have removed some head buffers, signal the pushing thread to
@ -3057,6 +3191,8 @@ finished:
if (msg)
gst_element_post_message (GST_ELEMENT_CAST (jitterbuffer), msg);
if (drop_msg)
gst_element_post_message (GST_ELEMENT_CAST (jitterbuffer), drop_msg);
return ret;
@ -3095,6 +3231,9 @@ too_late:
GST_DEBUG_OBJECT (jitterbuffer, "Packet #%d too late as #%d was already"
" popped, dropping", seqnum, priv->last_popped_seqnum);
priv->num_late++;
if (priv->post_drop_messages) {
drop_msg = new_drop_message (jitterbuffer, seqnum, pts, REASON_TOO_LATE);
}
gst_buffer_unref (buffer);
goto finished;
}
@ -3103,6 +3242,10 @@ already_lost:
GST_DEBUG_OBJECT (jitterbuffer, "Packet #%d too late as it was already "
"considered lost", seqnum);
priv->num_late++;
if (priv->post_drop_messages) {
drop_msg =
new_drop_message (jitterbuffer, seqnum, pts, REASON_ALREADY_LOST);
}
gst_buffer_unref (buffer);
goto finished;
}
@ -4434,6 +4577,16 @@ gst_rtp_jitter_buffer_set_property (GObject * object,
priv->do_lost = g_value_get_boolean (value);
JBUF_UNLOCK (priv);
break;
case PROP_POST_DROP_MESSAGES:
JBUF_LOCK (priv);
priv->post_drop_messages = g_value_get_boolean (value);
JBUF_UNLOCK (priv);
break;
case PROP_DROP_MESSAGES_INTERVAL:
JBUF_LOCK (priv);
priv->drop_messages_interval_ms = g_value_get_uint (value);
JBUF_UNLOCK (priv);
break;
case PROP_MODE:
JBUF_LOCK (priv);
rtp_jitter_buffer_set_mode (priv->jbuf, g_value_get_enum (value));
@ -4562,6 +4715,16 @@ gst_rtp_jitter_buffer_get_property (GObject * object,
g_value_set_boolean (value, priv->do_lost);
JBUF_UNLOCK (priv);
break;
case PROP_POST_DROP_MESSAGES:
JBUF_LOCK (priv);
g_value_set_boolean (value, priv->post_drop_messages);
JBUF_UNLOCK (priv);
break;
case PROP_DROP_MESSAGES_INTERVAL:
JBUF_LOCK (priv);
g_value_set_uint (value, priv->drop_messages_interval_ms);
JBUF_UNLOCK (priv);
break;
case PROP_MODE:
JBUF_LOCK (priv);
g_value_set_enum (value, rtp_jitter_buffer_get_mode (priv->jbuf));

View file

@ -899,7 +899,7 @@ GST_START_TEST (test_late_packets_still_makes_lost_events)
generate_test_buffer_full (now,
seqnum, seqnum * TEST_RTP_TS_DURATION)));
/* we should now receive packet-lost-events for the gap
/* we should now receive packet-lost-events for the gap
* FIXME: The timeout and duration here are a bit crap...
*/
verify_lost_event (h, next_seqnum, 3400 * GST_MSECOND, 6500 * GST_MSECOND);
@ -3018,6 +3018,292 @@ GST_START_TEST (test_timer_queue_timer_offset)
GST_END_TEST;
static gboolean
check_drop_message (GstMessage * drop_msg, const char *reason_check,
guint seqnum_check, guint num_msg)
{
const GstStructure *s = gst_message_get_structure (drop_msg);
const gchar *reason_str;
GstClockTime timestamp;
guint seqnum;
guint num_too_late;
guint num_already_lost;
guint num_drop_on_latency;
guint num_too_late_check = 0;
guint num_already_lost_check = 0;
guint num_drop_on_latency_check = 0;
/* Check that fields exist */
fail_unless (gst_structure_get_uint (s, "seqnum", &seqnum));
fail_unless (gst_structure_get_uint64 (s, "timestamp", &timestamp));
fail_unless (gst_structure_get_uint (s, "num-too-late", &num_too_late));
fail_unless (gst_structure_get_uint (s, "num-already-lost",
&num_already_lost));
fail_unless (gst_structure_get_uint (s, "num-drop-on-latency",
&num_drop_on_latency));
fail_unless (reason_str = gst_structure_get_string (s, "reason"));
/* Assing what to compare message fields to based on message reason */
if (g_strcmp0 (reason_check, "too-late") == 0) {
num_too_late_check += num_msg;
} else if (g_strcmp0 (reason_check, "already-lost") == 0) {
num_already_lost_check += num_msg;
} else if (g_strcmp0 (reason_check, "drop-on-latency") == 0) {
num_drop_on_latency_check += num_msg;
} else {
return FALSE;
}
/* Check that fields have correct value */
fail_unless (seqnum == seqnum_check);
fail_unless (g_strcmp0 (reason_str, reason_check) == 0);
fail_unless (num_too_late == num_too_late_check);
fail_unless (num_already_lost == num_already_lost_check);
fail_unless (num_drop_on_latency == num_drop_on_latency_check);
return TRUE;
}
GST_START_TEST (test_drop_messages_too_late)
{
GstHarness *h = gst_harness_new ("rtpjitterbuffer");
gint latency_ms = 100;
guint next_seqnum;
GstBus *bus;
GstMessage *drop_msg;
gboolean have_message = FALSE;
g_object_set (h->element, "post-drop-messages", TRUE, NULL);
next_seqnum = construct_deterministic_initial_state (h, latency_ms);
/* Create a bus to get the drop message on */
bus = gst_bus_new ();
gst_element_set_bus (h->element, bus);
/* Push test buffer resulting in gap of one */
push_test_buffer (h, next_seqnum + 1);
/* Advance time to trigger timeout of the missing buffer */
gst_harness_crank_single_clock_wait (h);
/* Pull out and unref pushed buffer */
gst_buffer_unref (gst_harness_pull (h));
/* Push missing buffer, now arriving "too-late" */
fail_unless_equals_int (GST_FLOW_OK, gst_harness_push (h,
generate_test_buffer (next_seqnum)));
/* Pop the resulting drop message and check its correctness */
while (!have_message &&
(drop_msg = gst_bus_pop_filtered (bus, GST_MESSAGE_ELEMENT)) != NULL) {
if (gst_message_has_name (drop_msg, "drop-msg")) {
fail_unless (check_drop_message (drop_msg, "too-late", next_seqnum, 1));
have_message = TRUE;
}
gst_message_unref (drop_msg);
}
fail_unless (have_message);
/* Cleanup */
gst_element_set_bus (h->element, NULL);
gst_object_unref (bus);
gst_harness_teardown (h);
}
GST_END_TEST;
GST_START_TEST (test_drop_messages_already_lost)
{
GstHarness *h = gst_harness_new ("rtpjitterbuffer");
GstTestClock *testclock;
GstClockID id;
gint latency_ms = 20;
guint seqnum_late;
guint seqnum_final;
GstBus *bus;
GstMessage *drop_msg;
gboolean have_message = FALSE;
testclock = gst_harness_get_testclock (h);
g_object_set (h->element, "post-drop-messages", TRUE, NULL);
/* Create a bus to get the drop message on */
bus = gst_bus_new ();
gst_element_set_bus (h->element, bus);
/* Get seqnum from initial state */
seqnum_late = construct_deterministic_initial_state (h, latency_ms);
/* Hop over 3 buffers and push buffer (gap of 3) */
seqnum_final = seqnum_late + 4;
fail_unless_equals_int (GST_FLOW_OK,
gst_harness_push (h,
generate_test_buffer_full (seqnum_final * TEST_BUF_DURATION,
seqnum_final, seqnum_final * TEST_RTP_TS_DURATION)));
/* The jitterbuffer should be waiting for the timeout of a "large gap timer"
* for buffer seqnum_late and seqnum_late+1 */
gst_test_clock_wait_for_next_pending_id (testclock, &id);
fail_unless_equals_uint64 (seqnum_late * TEST_BUF_DURATION +
latency_ms * GST_MSECOND, gst_clock_id_get_time (id));
/* Now seqnum_late sneaks in before the lost event for buffer seqnum_late and seqnum_late+1 is
* processed. It will be dropped due to already having been considered lost */
fail_unless_equals_int (GST_FLOW_OK,
gst_harness_push (h,
generate_test_buffer_full (seqnum_late * TEST_BUF_DURATION,
seqnum_late, seqnum_late * TEST_RTP_TS_DURATION)));
/* Pop the resulting drop message and check its correctness */
while (!have_message &&
(drop_msg = gst_bus_pop_filtered (bus, GST_MESSAGE_ELEMENT)) != NULL) {
if (gst_message_has_name (drop_msg, "drop-msg")) {
fail_unless (check_drop_message (drop_msg, "already-lost", seqnum_late,
1));
have_message = TRUE;
}
gst_message_unref (drop_msg);
}
fail_unless (have_message);
/* Cleanup */
gst_clock_id_unref (id);
gst_element_set_bus (h->element, NULL);
gst_buffer_unref (gst_harness_take_all_data_as_buffer (h));
gst_object_unref (bus);
gst_object_unref (testclock);
gst_harness_teardown (h);
}
GST_END_TEST;
GST_START_TEST (test_drop_messages_drop_on_latency)
{
GstHarness *h = gst_harness_new ("rtpjitterbuffer");
gint latency_ms = 20;
guint next_seqnum;
guint first_seqnum;
guint final_seqnum;
GstBus *bus;
GstMessage *drop_msg;
gboolean have_message = FALSE;
g_object_set (h->element, "post-drop-messages", TRUE, NULL);
g_object_set (h->element, "drop-on-latency", TRUE, NULL);
next_seqnum = construct_deterministic_initial_state (h, latency_ms);
/* Create a bus to get the drop message on */
bus = gst_bus_new ();
gst_element_set_bus (h->element, bus);
/* Push 3 buffers in correct seqnum order with initial gap of 1, with the buffers
* arriving simultaneously in harness time. First buffer will wait for gap buffer,
* and the third arriving buffer will trigger the first to be dropped due to
* drop-on-latency.
*/
first_seqnum = ++next_seqnum;
final_seqnum = next_seqnum + 2;
for (; next_seqnum <= final_seqnum; next_seqnum++) {
fail_unless_equals_int (GST_FLOW_OK, gst_harness_push (h,
generate_test_buffer_full (next_seqnum * TEST_BUF_DURATION,
next_seqnum, next_seqnum * TEST_RTP_TS_DURATION)));
}
/* Pop the resulting drop message and check its correctness */
while (!have_message &&
(drop_msg = gst_bus_pop_filtered (bus, GST_MESSAGE_ELEMENT)) != NULL) {
if (gst_message_has_name (drop_msg, "drop-msg")) {
fail_unless (check_drop_message (drop_msg, "drop-on-latency",
first_seqnum, 1));
have_message = TRUE;
}
gst_message_unref (drop_msg);
}
fail_unless (have_message);
/* Cleanup */
gst_element_set_bus (h->element, NULL);
gst_object_unref (bus);
gst_buffer_unref (gst_harness_take_all_data_as_buffer (h));
gst_harness_teardown (h);
}
GST_END_TEST;
GST_START_TEST (test_drop_messages_interval)
{
GstHarness *h = gst_harness_new ("rtpjitterbuffer");
guint latency_ms = 100;
GstClockTime interval = 10;
guint next_seqnum;
guint final_seqnum;
GstBus *bus;
GstMessage *drop_msg;
GstClockType now;
guint num_late_not_sent = 0;
guint num_sent_msg = 0;
g_object_set (h->element, "post-drop-messages", TRUE, NULL);
g_object_set (h->element, "drop-messages-interval", interval, NULL);
next_seqnum = construct_deterministic_initial_state (h, latency_ms);
/* Create a bus to get the drop message on */
bus = gst_bus_new ();
gst_element_set_bus (h->element, bus);
/* Jump 1 second forward in time */
now = 1 * GST_SECOND;
gst_harness_set_time (h, now);
/* Push a packet with a gap of 3, that now is very late */
final_seqnum = next_seqnum + 3;
fail_unless_equals_int (GST_FLOW_OK, gst_harness_push (h,
generate_test_buffer_full (now,
final_seqnum, final_seqnum * TEST_RTP_TS_DURATION)));
/* Pull and unref pushed buffer */
gst_buffer_unref (gst_harness_pull (h));
/* The 3 missing packets are now pushed with half the message "interval" between them.
* When arriving they are considered as "too-late". Only the first and third should trigger
* a drop_msg, as the second is dropped during the interval where no new messages will be sent.
* The second should have num-too-late=2, as the "too-late" event that never sent a message
* still increments the count of dropped "too-late" buffers.
*/
for (; next_seqnum < final_seqnum; next_seqnum++) {
fail_unless_equals_int (GST_FLOW_OK, gst_harness_push (h,
generate_test_buffer (next_seqnum)));
num_late_not_sent++;
/* Pop a potential drop message and check its correctness */
while ((drop_msg = gst_bus_pop (bus)) != NULL) {
if (gst_message_has_name (drop_msg, "drop-msg")) {
fail_unless (check_drop_message (drop_msg, "too-late", next_seqnum,
num_late_not_sent));
num_late_not_sent = 0;
num_sent_msg++;
}
gst_message_unref (drop_msg);
}
/* Advance time half the minimum interval of sending drop messages */
now += (interval * GST_MSECOND) / 2;
gst_harness_set_time (h, now);
}
/* Exactly two drop messages should have been sent */
fail_unless (num_sent_msg == 2);
/* Cleanup */
gst_element_set_bus (h->element, NULL);
gst_object_unref (bus);
gst_harness_teardown (h);
}
GST_END_TEST;
static Suite *
rtpjitterbuffer_suite (void)
{
@ -3090,6 +3376,11 @@ rtpjitterbuffer_suite (void)
tcase_add_test (tc_chain, test_performance);
tcase_add_test (tc_chain, test_drop_messages_too_late);
tcase_add_test (tc_chain, test_drop_messages_already_lost);
tcase_add_test (tc_chain, test_drop_messages_drop_on_latency);
tcase_add_test (tc_chain, test_drop_messages_interval);
return s;
}