avtpcvfpay: Ensure NAL fragments are transmitted following stream specs

TSN streams are expected to send packets to the network in a well
defined "pace", which is arbitrarily defined for each stream. This pace
is defined by the "measurement interval" property of a stream.

When the AVTP CVF payloader element - avtpcvfpay - fragments a video
frame that is too big to be sent to the network, it currently defines
that all fragments should be transmitted at the same time (via DTS
property of GstBuffers generated, as sink will use those to time the
transmission of the AVTPDU). This doesn't comply with stream definition,
which also has a limit on how many packets can be sent on a given
measurement interval.

This patch solves that by spreading in time the DTS of the GstBuffers
containing the AVTPDUs. Two new properties, "measurement-interval" and
"max-interval-frames", added to avptcvfpay element so that it knows
stream measurement interval and how many AVTPDUs it can send on any of
them. More details on the method used to proper spread DTS/PTS according
to measurement interval can be found in a code commentary inside this patch.

Tests also added for the new property and behaviour.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-bad/-/merge_requests/1004>
This commit is contained in:
Ederson de Souza 2020-04-03 10:40:43 -07:00 committed by GStreamer Merge Bot
parent 13d55627f0
commit 12838af353
3 changed files with 227 additions and 12 deletions

View file

@ -62,10 +62,14 @@ static GstStateChangeReturn gst_avtp_cvf_change_state (GstElement *
enum
{
PROP_0,
PROP_MTU
PROP_MTU,
PROP_MEASUREMENT_INTERVAL,
PROP_MAX_INTERVAL_FRAME
};
#define DEFAULT_MTU 1500
#define DEFAULT_MEASUREMENT_INTERVAL 250000
#define DEFAULT_MAX_INTERVAL_FRAMES 1
#define AVTP_CVF_H264_HEADER_SIZE (sizeof(struct avtp_stream_pdu) + sizeof(guint32))
#define FU_A_TYPE 28
@ -125,6 +129,18 @@ gst_avtp_cvf_pay_class_init (GstAvtpCvfPayClass * klass)
"Maximum Transit Unit (MTU) of underlying network in bytes", 0,
G_MAXUINT, DEFAULT_MTU, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_MEASUREMENT_INTERVAL,
g_param_spec_uint64 ("measurement-interval", "Measurement Interval",
"Measurement interval of stream in nanoseconds", 0,
G_MAXUINT64, DEFAULT_MEASUREMENT_INTERVAL,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_MAX_INTERVAL_FRAME,
g_param_spec_uint ("max-interval-frames", "Maximum Interval Frames",
"Maximum number of network frames to be sent on each Measurement Interval",
1, G_MAXUINT, DEFAULT_MAX_INTERVAL_FRAMES,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
GST_DEBUG_CATEGORY_INIT (avtpcvfpay_debug, "avtpcvfpay",
0, "debug category for avtpcvfpay element");
}
@ -135,6 +151,9 @@ gst_avtp_cvf_pay_init (GstAvtpCvfPay * avtpcvfpay)
avtpcvfpay->mtu = DEFAULT_MTU;
avtpcvfpay->header = NULL;
avtpcvfpay->nal_length_size = 0;
avtpcvfpay->measurement_interval = DEFAULT_MEASUREMENT_INTERVAL;
avtpcvfpay->max_interval_frames = DEFAULT_MAX_INTERVAL_FRAMES;
avtpcvfpay->last_interval_ct = 0;
}
static void
@ -145,10 +164,19 @@ gst_avtp_cvf_set_property (GObject * object, guint prop_id,
GST_DEBUG_OBJECT (avtpcvfpay, "prop_id: %u", prop_id);
if (prop_id == PROP_MTU) {
avtpcvfpay->mtu = g_value_get_uint (value);
} else {
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
switch (prop_id) {
case PROP_MTU:
avtpcvfpay->mtu = g_value_get_uint (value);
break;
case PROP_MEASUREMENT_INTERVAL:
avtpcvfpay->measurement_interval = g_value_get_uint64 (value);
break;
case PROP_MAX_INTERVAL_FRAME:
avtpcvfpay->max_interval_frames = g_value_get_uint (value);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
}
@ -160,10 +188,19 @@ gst_avtp_cvf_get_property (GObject * object, guint prop_id,
GST_DEBUG_OBJECT (avtpcvfpay, "prop_id: %u", prop_id);
if (prop_id == PROP_MTU) {
g_value_set_uint (value, avtpcvfpay->mtu);
} else {
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
switch (prop_id) {
case PROP_MTU:
g_value_set_uint (value, avtpcvfpay->mtu);
break;
case PROP_MEASUREMENT_INTERVAL:
g_value_set_uint64 (value, avtpcvfpay->measurement_interval);
break;
case PROP_MAX_INTERVAL_FRAME:
g_value_set_uint (value, avtpcvfpay->max_interval_frames);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
}
@ -371,6 +408,114 @@ gst_avtpcvpay_fragment_nal (GstAvtpCvfPay * avtpcvfpay, GstBuffer * nal,
return fragment;
}
static void
gst_avtp_cvf_pay_spread_ts (GstAvtpCvfPay * avtpcvfpay,
GPtrArray * avtp_packets)
{
/* A bit of the idea of what this function do:
*
* After fragmenting the NAL unit, we have a series of AVTPDUs (AVTP Data Units)
* that should be transmitted. They are going to be transmitted according to GstBuffer
* DTS (or PTS in case there's no DTS), but all of them have the same DTS, as they
* came from the same original NAL unit.
*
* However, TSN streams should send their data according to a "measurement interval",
* which is an arbitrary interval defined for the stream. For instance, a class A
* stream has measurement interval of 125us. Also, there's a MaxIntervalFrames
* parameter, that defines how many network frames can be sent on a given measurement
* interval. We also spread MaxIntervalFrames per measurement interval.
*
* To that end, this function will spread the DTS so that fragments follow measurement
* interval and MaxIntervalFrames, adjusting them to end before the actual DTS of the
* original NAL unit.
*
* Roughly, this function does:
*
* DTSn = DTSbase - (measurement_interval/MaxIntervalFrames) * (total - n - 1)
*
* Where:
* DTSn = DTS of nth fragment
* DTSbase = DTS of original NAL unit
* total = # of fragments
*
* Another issue that this function takes care of is avoiding DTSs that overlap between
* two different set of fragments. Assuming DTSlast is the DTS of the last fragment
* generated on previous call to this function, we don't want any DTSn for the current
* call to be smaller than DTSlast + (measurement_interval / MaxIntervalFrames). If
* that's the case, we adjust DTSbase to preserve this difference (so we don't schedule
* packets transmission times that violate stream spec). This will cause the last
* fragment DTS to be bigger than DTSbase - we emit a warning, as this may be a sign
* of a bad pipeline setup or inappropriate stream spec.
*
* Finally, we also avoid underflows - which would occur when DTSbase is zero or small
* enough. In this case, we'll again make last fragment DTS > DTSbase, so we log it.
*
*/
GstAvtpBasePayload *avtpbasepayload = GST_AVTP_BASE_PAYLOAD (avtpcvfpay);
gint i, ret;
guint len;
guint64 tx_interval, total_interval;
GstClockTime base_time, base_dts, rt;
GstBuffer *packet;
base_time = gst_element_get_base_time (GST_ELEMENT (avtpcvfpay));
base_dts = GST_BUFFER_DTS (g_ptr_array_index (avtp_packets, 0));
tx_interval =
avtpcvfpay->measurement_interval / avtpcvfpay->max_interval_frames;
len = avtp_packets->len;
total_interval = tx_interval * (len - 1);
/* We don't want packets transmission time to overlap, so let's ensure
* packets are scheduled after last interval used */
if (avtpcvfpay->last_interval_ct != 0) {
GstClockTime dts_ct, dts_rt;
ret =
gst_segment_to_running_time_full (&avtpbasepayload->segment,
GST_FORMAT_TIME, base_dts, &dts_rt);
if (ret == -1)
dts_rt = -dts_rt;
dts_ct = base_time + dts_rt;
if (dts_ct < avtpcvfpay->last_interval_ct + total_interval + tx_interval) {
base_dts +=
avtpcvfpay->last_interval_ct + total_interval + tx_interval - dts_ct;
GST_WARNING_OBJECT (avtpcvfpay,
"Not enough measurements intervals between frames to transmit fragments"
". Check stream transmission spec.");
}
}
/* Not enough room to spread tx before DTS (or we would underflow),
* add offset */
if (total_interval > base_dts) {
base_dts += total_interval - base_dts;
GST_INFO_OBJECT (avtpcvfpay,
"Not enough measurements intervals to transmit fragments before base "
"DTS. Check pipeline settings. Are we live?");
}
for (i = 0; i < len; i++) {
packet = g_ptr_array_index (avtp_packets, i);
GST_BUFFER_DTS (packet) = base_dts - tx_interval * (len - i - 1);
}
/* Remember last interval used, in clock time */
ret =
gst_segment_to_running_time_full (&avtpbasepayload->segment,
GST_FORMAT_TIME, GST_BUFFER_DTS (g_ptr_array_index (avtp_packets,
avtp_packets->len - 1)), &rt);
if (ret == -1)
rt = -rt;
avtpcvfpay->last_interval_ct = base_time + rt;
}
static gboolean
gst_avtp_cvf_pay_prepare_avtp_packets (GstAvtpCvfPay * avtpcvfpay,
GPtrArray * nals, GPtrArray * avtp_packets)
@ -480,6 +625,11 @@ gst_avtp_cvf_pay_prepare_avtp_packets (GstAvtpCvfPay * avtpcvfpay,
GST_LOG_OBJECT (avtpcvfpay, "Prepared %u AVTP packets", avtp_packets->len);
/* Ensure DTS/PTS respect stream transmit spec, so PDUs are transmitted
* according to measurement interval. */
if (avtp_packets->len > 0)
gst_avtp_cvf_pay_spread_ts (avtpcvfpay, avtp_packets);
return TRUE;
}

View file

@ -47,6 +47,9 @@ struct _GstAvtpCvfPay
GstBuffer *header;
guint mtu;
guint64 measurement_interval;
guint max_interval_frames;
guint64 last_interval_ct;
/* H.264 specific information */
guint8 nal_length_size;

View file

@ -139,6 +139,59 @@ compare_h264_avtpdu (struct avtp_stream_pdu *pdu, GstBuffer * buffer)
return result;
}
GST_START_TEST (test_payloader_spread_ts)
{
GstHarness *h;
GstBuffer *in, *out;
gint i, total_fragments, max_interval_frames;
guint64 first_tx_time, final_dts, measurement_interval = 250000;
/* Create the harness for the avtpcvfpay */
h = gst_harness_new_parse
("avtpcvfpay streamid=0xAABBCCDDEEFF0001 mtt=2000000 tu=125000 processing-deadline=0 mtu=128 measurement-interval=250000 max-interval-frames=3");
gst_harness_set_src_caps (h, generate_caps (4));
/* A 980 bytes NAL with mtu=128 should generate 10 fragments */
in = gst_harness_create_buffer (h, 980 + 4);
add_nal (in, 980, 7, 0);
GST_BUFFER_DTS (in) = final_dts = 1000000;
GST_BUFFER_PTS (in) = 2000000;
/* We now push the buffer, and check if we got ten from the avtpcvfpay */
gst_harness_push (h, in);
fail_unless_equals_int (gst_harness_buffers_received (h), 10);
/* Using max-interval-frames=3, we'll need 4 measurement intervals to send
* all fragments, with last one just about current DTS, and others
* progressively before that.
*
* So we should have something like:
*
* | 1st | 2nd | 3rd | 4th | Intervals
* | 1 2 3 | 4 5 6 | 7 8 9 | 10 | AVTPDUs in each interval (sharing same DTS/PTS)
*
* And PTS/DTS should increment by a
* measurement-interval / max-interval-frames for each AVTPDU.
*/
i = 0;
total_fragments = 10;
max_interval_frames = 3;
first_tx_time =
final_dts -
(measurement_interval / max_interval_frames) * (total_fragments - 1);
for (i = 0; i < 10; i++) {
out = gst_harness_pull (h);
fail_unless_equals_uint64 (GST_BUFFER_DTS (out), first_tx_time);
first_tx_time += measurement_interval / max_interval_frames;
}
gst_harness_teardown (h);
}
GST_END_TEST;
GST_START_TEST (test_payloader_downstream_eos)
{
GstHarness *h;
@ -332,12 +385,12 @@ GST_START_TEST (test_payloader_properties)
{
GstHarness *h;
GstElement *element;
guint mtu, mtt, tu;
guint64 streamid, processing_deadline;
guint mtu, mtt, tu, max_interval_frames;
guint64 streamid, processing_deadline, measurement_interval;
/* Create the harness for the avtpcvfpay */
h = gst_harness_new_parse
("avtpcvfpay streamid=0xAABBCCDDEEFF0001 mtt=1000000 tu=2000000 mtu=100 processing-deadline=5000");
("avtpcvfpay streamid=0xAABBCCDDEEFF0001 mtt=1000000 tu=2000000 mtu=100 processing-deadline=5000 measurement-interval=125000 max-interval-frames=3");
/* Check if all properties were properly set up */
element = gst_harness_find_element (h, "avtpcvfpay");
@ -357,6 +410,14 @@ GST_START_TEST (test_payloader_properties)
NULL);
fail_unless_equals_uint64 (processing_deadline, 5000);
g_object_get (G_OBJECT (element), "measurement-interval",
&measurement_interval, NULL);
fail_unless_equals_uint64 (measurement_interval, 125000);
g_object_get (G_OBJECT (element), "max-interval-frames",
&max_interval_frames, NULL);
fail_unless_equals_uint64 (max_interval_frames, 3);
gst_object_unref (element);
gst_harness_teardown (h);
}
@ -704,6 +765,7 @@ avtpcvfpay_suite (void)
tcase_add_test (tc_chain, test_payloader_no_codec_data);
tcase_add_test (tc_chain, test_payloader_zero_sized_nal);
tcase_add_test (tc_chain, test_payloader_downstream_eos);
tcase_add_test (tc_chain, test_payloader_spread_ts);
return s;
}