avtp: Introduce the CRF Sync Element

This commit introduces the AVTP Clock Reference Format (CRF) Synchronizer
element. This element implements the AVTP CRF Listener as described in IEEE
1722-2016 Section 10.

CRF is useful in synchronizing events within different systems by
distributing a common clock. This is useful in a scenario where there are
multiple talkers who are sending data to a single listener which is
processing that data. E.g.  CCTV cameras on a network sending AVTP video
streams to a base station to display on the same screen.

It is assumed that all the systems are already time-synchronized with each
other. So, the AVTP Talker essentially adjusts the AVTP Presentation Time
so it's phase-locked with the reference clock provided by the CRF stream.

There are 2 different roles of systems which participate in CRF data
exchange.  A system can either be a CRF Talker, which samples it's own
clock and generates a stream of timestamps to transmit over the network, or
a CRF Listener, the system which receives the generated timestamps and
recovers the media clock from the timestamps. It then adjusts it's own
clock to align with recovered media clock. The timestamps generated by the
talker may not be continuous and the listener might have to interpolate
some timestamps to recover the media clock. The number of timestamps to
interpolate is mentioned in the CRF stream AVTPDU (Refer IEEE 1722-2016
Section 10.4 for AVTPDU structure). Only CRF Listener has been implemented
in this commit.

The CRF Sync element will create a separate thread to listen for the CRF
stream. This thread will calculate and store the average period of the
recovered media clock. The pipeline thread will use this stored period
along with the first timestamp of the latest CRF AVTPDU received to
calculate adjustment for timestamps in the audio/video streams. In case of
CRF AVTPDUs with single timestamp, two consecutive CRF AVTPDUs will be used
to figure out the average period of the recovered media clock.

In case of H264 streams, both AVTP timestamp and H264 timestamp will be
adjusted.

In the future commits, another "CRF Checker" element will be introduced
which will validate the timestamps on the AVTP Listener side. Which is why
a lot of code has been implemented as part of the gstcrfbase class.
This commit is contained in:
Vedang Patel 2020-02-05 16:17:39 -08:00 committed by vedangpatel1
parent 3ea0f694de
commit 12ad2a4bcd
8 changed files with 1148 additions and 6 deletions

View file

@ -82,6 +82,19 @@
* should use GstSytemClock with GST_CLOCK_TYPE_REALTIME as the pipeline
* clock.
*
* ### Clock Reference Format (CRF)
*
* Even though the systems are synchronized by PTP, it is possible that
* different talkers can send media streams which are out of phase or the
* frequencies do not exactly match. This is partcularly important when there
* is a single listener processing data from multiple talkers. The systems in
* this scenario can benefit if a common clock is distributed among the
* systems.
*
* This can be achieved by using the avtpcrfsync element which implements CRF
* as described in Chapter 10 of IEEE 1722-2016. For further details, look at
* the documentation for avtpcrfsync.
*
* ### Traffic Control Setup
*
* FQTSS (Forwarding and Queuing Enhancements for Time-Sensitive Streams) can be
@ -133,11 +146,11 @@
* Each element has its own configuration properties, with some being common
* to several elements. Basic properties are:
*
* * streamid (avtpaafpay, avtpcvfpay, avtpaafdepay, avtpcvfdepay): Stream ID
* associated with the stream.
* * streamid (avtpaafpay, avtpcvfpay, avtpaafdepay, avtpcvfdepay,
* avtpcrfsync): Stream ID associated with the stream.
*
* * ifname (avtpsink, avtpsrc): Network interface used to send/receive
* AVTP packets.
* * ifname (avtpsink, avtpsrc, avtpcrfsync): Network interface
* used to send/receive AVTP packets.
*
* * dst-macaddr (avtpsink, avtpsrc): Destination MAC address for the stream.
*
@ -177,7 +190,7 @@
*
* $ gst-launch-1.0 -k ptp videotestsrc is-live=true ! clockoverlay ! \
* x264enc ! avtpcvfpay processing-deadline=20000000 ! \
* avtpsink ifname=$IFNAME
* avtpcrfsync ifname=$IFNAME ! avtpsink ifname=$IFNAME
*
* On the AVTP listener host, the following pipeline can be used to get the
* AVTP stream, depacketize it and show it on the screen:
@ -223,6 +236,7 @@
#include "gstavtpcvfpay.h"
#include "gstavtpsink.h"
#include "gstavtpsrc.h"
#include "gstavtpcrfsync.h"
static gboolean
plugin_init (GstPlugin * plugin)
@ -239,6 +253,8 @@ plugin_init (GstPlugin * plugin)
return FALSE;
if (!gst_avtp_cvf_depay_plugin_init (plugin))
return FALSE;
if (!gst_avtp_crf_sync_plugin_init (plugin))
return FALSE;
return TRUE;
}

583
ext/avtp/gstavtpcrfbase.c Normal file
View file

@ -0,0 +1,583 @@
/*
* GStreamer AVTP Plugin
* Copyright (C) 2019 Intel Corporation
*
* This library is free software; you can redistribute it and/or modify it
* under the terms of the GNU Library General Public License as published by
* the Free Software Foundation; either version 2 of the License, or (at your
* option) any later version.
*
* This library is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* See the GNU Library General Public License for more details. You should
* have received a copy of the GNU Library General Public License along with
* this library; if not , write to the Free Software Foundation, Inc., 51
* Franklin St, Fifth Floor, Boston, MA 02110 - 1301, USA.
*/
#include <arpa/inet.h>
#include <avtp.h>
#include <avtp_crf.h>
#include <glib.h>
#include <math.h>
#include <net/ethernet.h>
#include <net/if.h>
#include <stdio.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>
#include "gstavtpcrfutil.h"
#include "gstavtpcrfbase.h"
GST_DEBUG_CATEGORY_STATIC (avtpcrfbase_debug);
#define GST_CAT_DEFAULT (avtpcrfbase_debug)
#define CRF_TIMESTAMP_SIZE 8
#define MAX_AVTPDU_SIZE 1500
#define MAX_NUM_PERIODS_STORED 10
#define RECV_TIMEOUT 1 // in seconds
#define DEFAULT_STREAMID 0xAABBCCDDEEFF1000
#define DEFAULT_IFNAME "eth0"
#define DEFAULT_ADDRESS "01:AA:AA:AA:AA:AA"
enum
{
PROP_0,
PROP_STREAMID,
PROP_IFNAME,
PROP_ADDRESS,
};
static GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE ("src",
GST_PAD_SRC,
GST_PAD_ALWAYS,
GST_STATIC_CAPS ("application/x-avtp")
);
static GstStaticPadTemplate sink_template = GST_STATIC_PAD_TEMPLATE ("sink",
GST_PAD_SINK,
GST_PAD_ALWAYS,
GST_STATIC_CAPS ("application/x-avtp")
);
static void gst_avtp_crf_base_finalize (GObject * gobject);
static void
gst_avtp_crf_base_set_property (GObject * object, guint prop_id,
const GValue * value, GParamSpec * pspec);
static void
gst_avtp_crf_base_get_property (GObject * object, guint prop_id,
GValue * value, GParamSpec * pspec);
static GstStateChangeReturn gst_avtp_crf_base_change_state (GstElement *
element, GstStateChange transition);
static void crf_listener_thread_func (GstAvtpCrfBase * avtpcrfbase);
#define gst_avtp_crf_base_parent_class parent_class
G_DEFINE_TYPE (GstAvtpCrfBase, gst_avtp_crf_base, GST_TYPE_BASE_TRANSFORM);
static void
gst_avtp_crf_base_class_init (GstAvtpCrfBaseClass * klass)
{
GstElementClass *element_class = GST_ELEMENT_CLASS (klass);
GObjectClass *object_class = G_OBJECT_CLASS (klass);
object_class->finalize = GST_DEBUG_FUNCPTR (gst_avtp_crf_base_finalize);
object_class->get_property =
GST_DEBUG_FUNCPTR (gst_avtp_crf_base_get_property);
object_class->set_property =
GST_DEBUG_FUNCPTR (gst_avtp_crf_base_set_property);
g_object_class_install_property (object_class, PROP_STREAMID,
g_param_spec_uint64 ("streamid", "Stream ID",
"Stream ID associated with the CRF AVTPDU", 0, G_MAXUINT64,
DEFAULT_STREAMID, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS |
GST_PARAM_MUTABLE_READY));
g_object_class_install_property (object_class, PROP_IFNAME,
g_param_spec_string ("ifname", "Interface Name",
"Network interface utilized to receive CRF AVTPDUs",
DEFAULT_IFNAME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS |
GST_PARAM_MUTABLE_READY));
g_object_class_install_property (object_class, PROP_ADDRESS,
g_param_spec_string ("address", "Destination MAC address",
"Destination MAC address expected on the Ethernet frames",
DEFAULT_ADDRESS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS |
GST_PARAM_MUTABLE_READY));
element_class->change_state =
GST_DEBUG_FUNCPTR (gst_avtp_crf_base_change_state);
gst_element_class_add_static_pad_template (element_class, &sink_template);
gst_element_class_add_static_pad_template (element_class, &src_template);
GST_DEBUG_CATEGORY_INIT (avtpcrfbase_debug, "avtpcrfbase", 0, "CRF Base");
}
static void
gst_avtp_crf_base_init (GstAvtpCrfBase * avtpcrfbase)
{
avtpcrfbase->streamid = DEFAULT_STREAMID;
avtpcrfbase->ifname = g_strdup (DEFAULT_IFNAME);
avtpcrfbase->address = g_strdup (DEFAULT_ADDRESS);
}
static GstStateChangeReturn
gst_avtp_crf_base_change_state (GstElement * element, GstStateChange transition)
{
GstAvtpCrfBase *avtpcrfbase = GST_AVTP_CRF_BASE (element);
GstAvtpCrfThreadData *thread_data = &avtpcrfbase->thread_data;
GstStateChangeReturn res;
GError *error = NULL;
GST_DEBUG_OBJECT (avtpcrfbase, "transition %d", transition);
switch (transition) {
case GST_STATE_CHANGE_NULL_TO_READY:
thread_data->past_periods =
g_malloc0 (sizeof (guint64) * MAX_NUM_PERIODS_STORED);
thread_data->mr = -1;
thread_data->is_running = TRUE;
thread_data->thread =
g_thread_try_new ("crf-listener",
(GThreadFunc) crf_listener_thread_func, avtpcrfbase, &error);
if (error) {
GST_ERROR_OBJECT (avtpcrfbase, "failed to start thread, %s",
error->message);
g_error_free (error);
g_free (thread_data->past_periods);
return GST_STATE_CHANGE_FAILURE;
}
break;
default:
break;
}
res = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
switch (transition) {
case GST_STATE_CHANGE_READY_TO_NULL:
thread_data->is_running = FALSE;
g_thread_join (thread_data->thread);
g_free (thread_data->past_periods);
break;
default:
break;
}
return res;
}
static int
setup_socket (GstAvtpCrfBase * avtpcrfbase)
{
struct sockaddr_ll sk_addr = { 0 };
struct packet_mreq mreq = { 0 };
struct timeval timeout = { 0 };
guint8 addr[ETH_ALEN];
int fd, res, ifindex;
fd = socket (AF_PACKET, SOCK_DGRAM, htons (ETH_P_TSN));
if (fd < 0) {
GST_ERROR_OBJECT (avtpcrfbase, "Failed to open socket: %s",
strerror (errno));
return fd;
}
ifindex = if_nametoindex (avtpcrfbase->ifname);
if (!ifindex) {
res = -1;
GST_ERROR_OBJECT (avtpcrfbase, "Failed to get index for interface: %s",
strerror (errno));
goto err;
}
sk_addr.sll_family = AF_PACKET;
sk_addr.sll_protocol = htons (ETH_P_TSN);
sk_addr.sll_ifindex = ifindex;
res = bind (fd, (struct sockaddr *) &sk_addr, sizeof (sk_addr));
if (res < 0) {
GST_ERROR_OBJECT (avtpcrfbase, "Failed to bind socket: %s",
strerror (errno));
goto err;
}
res = sscanf (avtpcrfbase->address, "%hhx:%hhx:%hhx:%hhx:%hhx:%hhx",
&addr[0], &addr[1], &addr[2], &addr[3], &addr[4], &addr[5]);
if (res != 6) {
res = -1;
GST_ERROR_OBJECT (avtpcrfbase, "Destination MAC address format not valid");
goto err;
}
mreq.mr_ifindex = ifindex;
mreq.mr_type = PACKET_MR_MULTICAST;
mreq.mr_alen = ETH_ALEN;
memcpy (&mreq.mr_address, addr, ETH_ALEN);
res = setsockopt (fd, SOL_PACKET, PACKET_ADD_MEMBERSHIP, &mreq,
sizeof (struct packet_mreq));
if (res < 0) {
GST_ERROR_OBJECT (avtpcrfbase, "Failed to set multicast address: %s",
strerror (errno));
goto err;
}
timeout.tv_sec = RECV_TIMEOUT;
res =
setsockopt (fd, SOL_SOCKET, SO_RCVTIMEO, (void *) &timeout,
sizeof (struct timeval));
if (res < 0) {
GST_ERROR_OBJECT (avtpcrfbase, "Failed to set receive timeout: %s",
strerror (errno));
goto err;
}
return fd;
err:
close (fd);
return res;
}
static gboolean
validate_crf_pdu (GstAvtpCrfBase * avtpcrfbase, struct avtp_crf_pdu *crf_pdu,
int packet_size)
{
GstAvtpCrfThreadData *data = &avtpcrfbase->thread_data;
guint64 tstamp_interval, base_freq, pull, type;
guint64 streamid_valid, streamid, data_len;
guint32 subtype;
int res;
if (packet_size < sizeof (struct avtp_crf_pdu))
return FALSE;
res = avtp_pdu_get ((struct avtp_common_pdu *) crf_pdu, AVTP_FIELD_SUBTYPE,
&subtype);
g_assert (res == 0);
if (subtype != AVTP_SUBTYPE_CRF) {
GST_DEBUG_OBJECT (avtpcrfbase, "Not a CRF PDU.subtype: %u", subtype);
return FALSE;
}
res = avtp_crf_pdu_get (crf_pdu, AVTP_CRF_FIELD_SV, &streamid_valid);
g_assert (res == 0);
res = avtp_crf_pdu_get (crf_pdu, AVTP_CRF_FIELD_STREAM_ID, &streamid);
g_assert (res == 0);
res = avtp_crf_pdu_get (crf_pdu, AVTP_CRF_FIELD_CRF_DATA_LEN, &data_len);
g_assert (res == 0);
res = avtp_crf_pdu_get (crf_pdu, AVTP_CRF_FIELD_TIMESTAMP_INTERVAL,
(guint64 *) & tstamp_interval);
g_assert (res == 0);
res = avtp_crf_pdu_get (crf_pdu, AVTP_CRF_FIELD_BASE_FREQ, &base_freq);
g_assert (res == 0);
res = avtp_crf_pdu_get (crf_pdu, AVTP_CRF_FIELD_PULL, &pull);
g_assert (res == 0);
res = avtp_crf_pdu_get (crf_pdu, AVTP_CRF_FIELD_TYPE, &type);
g_assert (res == 0);
if (!streamid_valid || streamid != avtpcrfbase->streamid) {
GST_DEBUG_OBJECT (avtpcrfbase,
"Stream ID doesn't match. Discarding CRF packet");
return FALSE;
}
if (G_UNLIKELY (data_len + sizeof (struct avtp_crf_pdu) > packet_size)) {
GST_DEBUG_OBJECT (avtpcrfbase,
"Packet size smaller than expected. Discarding CRF packet");
return FALSE;
}
if (G_UNLIKELY (!data->timestamp_interval)) {
if (G_UNLIKELY (tstamp_interval == 0)) {
GST_DEBUG_OBJECT (avtpcrfbase,
"timestamp_interval should not be zero. Discarding CRF packet");
return FALSE;
}
data->timestamp_interval = tstamp_interval;
if (G_UNLIKELY (base_freq == 0)) {
GST_DEBUG_OBJECT (avtpcrfbase,
"Base Frequency cannot be zero, Discarding CRF packet");
goto error;
}
data->base_freq = base_freq;
if (G_UNLIKELY (pull > AVTP_CRF_PULL_MULT_BY_1_OVER_8)) {
GST_DEBUG_OBJECT (avtpcrfbase,
"Pull value invalid, Discarding CRF packet");
goto error;
}
data->pull = pull;
if (G_UNLIKELY (type > AVTP_CRF_TYPE_MACHINE_CYCLE)) {
GST_DEBUG_OBJECT (avtpcrfbase,
"CRF timestamp type invalid, Discarding CRF packet");
goto error;
}
data->type = type;
if (G_UNLIKELY (!data_len || data_len % 8 != 0)) {
GST_DEBUG_OBJECT (avtpcrfbase,
"Data Length should be a multiple of 8. Discarding CRF packet.");
goto error;
}
data->num_pkt_tstamps = data_len / CRF_TIMESTAMP_SIZE;
} else {
if (G_UNLIKELY (tstamp_interval != data->timestamp_interval)) {
GST_DEBUG_OBJECT (avtpcrfbase,
"Timestamp interval doesn't match, discarding CRF packet");
return FALSE;
}
if (G_UNLIKELY (base_freq != data->base_freq)) {
GST_DEBUG_OBJECT (avtpcrfbase,
"Base Frequency doesn't match, discarding CRF packet");
return FALSE;
}
if (G_UNLIKELY (pull != data->pull)) {
GST_DEBUG_OBJECT (avtpcrfbase,
"Pull value doesn't match, discarding CRF packet");
return FALSE;
}
if (G_UNLIKELY (data->type != type)) {
GST_DEBUG_OBJECT (avtpcrfbase,
"CRF timestamp type doesn't match, Discarding CRF packet");
return FALSE;
}
if (G_UNLIKELY (data_len / CRF_TIMESTAMP_SIZE != data->num_pkt_tstamps)) {
GST_DEBUG_OBJECT (avtpcrfbase,
"Number of timestamps doesn't match. discarding CRF packet");
return FALSE;
}
}
/* Make sure all the timestamps are monotonically increasing. */
for (int i = 0; i < data->num_pkt_tstamps - 1; i++) {
GstClockTime tstamp, next_tstamp;
tstamp = be64toh (crf_pdu->crf_data[i]);
next_tstamp = be64toh (crf_pdu->crf_data[i + 1]);
if (G_UNLIKELY (tstamp >= next_tstamp)) {
GST_DEBUG_OBJECT (avtpcrfbase,
"Timestamps are not monotonically increasing. discarding CRF packet");
return FALSE;
}
}
return TRUE;
error:
data->timestamp_interval = 0;
return FALSE;
}
static gdouble
get_base_freq_multiplier (GstAvtpCrfBase * avtpcrfbase, guint64 pull)
{
switch (pull) {
case 0:
return 1.0;
case 1:
return 1 / 1.001;
case 2:
return 1.001;
case 3:
return 24.0 / 25;
case 4:
return 25.0 / 24;
case 5:
return 1.0 / 8;
default:
GST_ERROR_OBJECT (avtpcrfbase, "Invalid pull value");
return -1;
}
}
static void
calculate_average_period (GstAvtpCrfBase * avtpcrfbase,
struct avtp_crf_pdu *crf_pdu)
{
GstAvtpCrfThreadData *data = &avtpcrfbase->thread_data;
GstClockTime first_pkt_tstamp, last_pkt_tstamp;
int num_pkt_tstamps, past_periods_iter;
GstClockTime accumulate_period = 0;
num_pkt_tstamps = data->num_pkt_tstamps;
past_periods_iter = data->past_periods_iter;
first_pkt_tstamp = be64toh (crf_pdu->crf_data[0]);
last_pkt_tstamp = be64toh (crf_pdu->crf_data[num_pkt_tstamps - 1]);
/*
* If there is only one CRF Timestamp per CRF AVTPU, at least two packets are
* needed to calculate the period. Also, sequence number needs to be checked
* to ensure consecutive packets are being used to calculate the period.
* Otherwise, we will just use the nominal frequency to estimate period.
*/
if (num_pkt_tstamps == 1) {
guint64 seqnum;
int res;
res = avtp_crf_pdu_get (crf_pdu, AVTP_CRF_FIELD_SEQ_NUM, &seqnum);
g_assert (res == 0);
if (!data->last_received_tstamp ||
((data->last_seqnum + 1) % 255 != seqnum)) {
GstClockTime average_period = data->average_period;
if (!data->last_received_tstamp) {
gdouble base_freq_mult;
base_freq_mult = get_base_freq_multiplier (avtpcrfbase, data->pull);
if (base_freq_mult < 0)
return;
average_period =
gst_util_uint64_scale (1.0, 1000000000,
(data->base_freq * base_freq_mult));
}
data->last_received_tstamp = first_pkt_tstamp;
data->last_seqnum = seqnum;
data->current_ts = first_pkt_tstamp;
data->average_period = average_period;
return;
}
data->past_periods[past_periods_iter] =
first_pkt_tstamp - data->last_received_tstamp;
data->last_received_tstamp = first_pkt_tstamp;
data->last_seqnum = seqnum;
} else {
data->past_periods[past_periods_iter] =
(last_pkt_tstamp - first_pkt_tstamp) /
(data->timestamp_interval * (num_pkt_tstamps - 1));
}
if (data->periods_stored < MAX_NUM_PERIODS_STORED)
data->periods_stored++;
data->past_periods_iter = (past_periods_iter + 1) % data->periods_stored;
for (int i = 0; i < data->periods_stored; i++)
accumulate_period += data->past_periods[i];
data->average_period = accumulate_period / data->periods_stored;
data->current_ts = first_pkt_tstamp;
}
static void
crf_listener_thread_func (GstAvtpCrfBase * avtpcrfbase)
{
GstAvtpCrfThreadData *data = &avtpcrfbase->thread_data;
struct avtp_crf_pdu *crf_pdu = g_alloca (MAX_AVTPDU_SIZE);
guint64 media_clk_reset;
int fd, n, res;
fd = setup_socket (avtpcrfbase);
if (fd < 0) {
GST_ELEMENT_ERROR (avtpcrfbase, RESOURCE, OPEN_READ,
("Cannot open socket for CRF Listener"), (NULL));
return;
}
while (data->is_running) {
n = recv (fd, crf_pdu, MAX_AVTPDU_SIZE, 0);
if (n == -1) {
if (errno == EAGAIN || errno == EINTR)
continue;
GST_ERROR_OBJECT (avtpcrfbase, "Failed to receive packet: %s",
strerror (errno));
break;
}
if (!validate_crf_pdu (avtpcrfbase, crf_pdu, n))
continue;
GST_DEBUG_OBJECT (avtpcrfbase, "Packet valid. Adding to buffer\n");
res = avtp_crf_pdu_get (crf_pdu, AVTP_CRF_FIELD_MR, &media_clk_reset);
g_assert (res == 0);
if (media_clk_reset != data->mr) {
memset (data->past_periods, 0, sizeof (gint64) * MAX_NUM_PERIODS_STORED);
data->periods_stored = 0;
data->average_period = 0;
data->current_ts = 0;
data->last_received_tstamp = 0;
data->past_periods_iter = 0;
data->mr = media_clk_reset;
}
calculate_average_period (avtpcrfbase, crf_pdu);
}
close (fd);
}
static void
gst_avtp_crf_base_finalize (GObject * object)
{
GstAvtpCrfBase *avtpcrfbase = GST_AVTP_CRF_BASE (object);
g_free (avtpcrfbase->ifname);
g_free (avtpcrfbase->address);
G_OBJECT_CLASS (parent_class)->finalize (object);
}
static void
gst_avtp_crf_base_set_property (GObject * object, guint prop_id,
const GValue * value, GParamSpec * pspec)
{
GstAvtpCrfBase *avtpcrfbase = GST_AVTP_CRF_BASE (object);
GST_DEBUG_OBJECT (avtpcrfbase, "prop_id %u", prop_id);
switch (prop_id) {
case PROP_STREAMID:
avtpcrfbase->streamid = g_value_get_uint64 (value);
break;
case PROP_IFNAME:
g_free (avtpcrfbase->ifname);
avtpcrfbase->ifname = g_value_dup_string (value);
break;
case PROP_ADDRESS:
g_free (avtpcrfbase->address);
avtpcrfbase->address = g_value_dup_string (value);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
}
static void
gst_avtp_crf_base_get_property (GObject * object, guint prop_id,
GValue * value, GParamSpec * pspec)
{
GstAvtpCrfBase *avtpcrfbase = GST_AVTP_CRF_BASE (object);
GST_DEBUG_OBJECT (avtpcrfbase, "prop_id %u", prop_id);
switch (prop_id) {
case PROP_STREAMID:
g_value_set_uint64 (value, avtpcrfbase->streamid);
break;
case PROP_IFNAME:
g_value_set_string (value, avtpcrfbase->ifname);
break;
case PROP_ADDRESS:
g_value_set_string (value, avtpcrfbase->address);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
}

86
ext/avtp/gstavtpcrfbase.h Normal file
View file

@ -0,0 +1,86 @@
/*
* GStreamer AVTP Plugin
* Copyright (C) 2019 Intel Corporation
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
#ifndef __GST_AVTP_CRF_BASE_H__
#define __GST_AVTP_CRF_BASE_H__
#include <gst/base/gstbasetransform.h>
#include <gst/gst.h>
#include <linux/if_packet.h>
G_BEGIN_DECLS
#define GST_TYPE_AVTP_CRF_BASE (gst_avtp_crf_base_get_type())
#define GST_AVTP_CRF_BASE(obj) \
(G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_AVTP_CRF_BASE,GstAvtpCrfBase))
#define GST_AVTP_CRF_BASE_CLASS(klass) \
(G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_AVTP_CRF_BASE,GstAvtpCrfBaseClass))
#define GST_IS_AVTP_CRF_BASE(obj) \
(G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_AVTP_CRF_BASE))
#define GST_IS_AVTP_CRF_BASE_CLASS(klass) \
(G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_AVTP_CRF_BASE))
typedef struct _GstAvtpCrfBase GstAvtpCrfBase;
typedef struct _GstAvtpCrfBaseClass GstAvtpCrfBaseClass;
typedef struct _GstAvtpCrfThreadData GstAvtpCrfThreadData;
struct _GstAvtpCrfThreadData
{
GThread *thread;
gboolean is_running;
guint64 num_pkt_tstamps;
GstClockTime timestamp_interval;
guint64 base_freq;
guint64 pull;
guint64 type;
guint64 mr;
GstClockTime *past_periods;
int past_periods_iter;
int periods_stored;
GstClockTime average_period;
GstClockTime current_ts;
GstClockTime last_received_tstamp;
guint64 last_seqnum;
};
struct _GstAvtpCrfBase
{
GstBaseTransform element;
guint64 streamid;
gchar *ifname;
gchar *address;
GstAvtpCrfThreadData thread_data;
};
struct _GstAvtpCrfBaseClass
{
GstBaseTransformClass parent_class;
GstPadEventFunction sink_event;
gpointer _gst_reserved[GST_PADDING];
};
GType gst_avtp_crf_base_get_type (void);
G_END_DECLS
#endif /* __GST_AVTP_CRF_BASE_H__ */

249
ext/avtp/gstavtpcrfsync.c Normal file
View file

@ -0,0 +1,249 @@
/*
* GStreamer AVTP Plugin
* Copyright (C) 2019 Intel Corporation
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
/**
* SECTION:element-avtpcrfsync
* @see_also: avtpcrfcheck
*
* Adjust the Presentation Time from AVTPDUs to align with the reference clock
* provided by the CRF stream. For detailed information see chapter 10 in
* https://standards.ieee.org/standard/1722-2016.html. A helpful aid for
* visualizing CRF and it's advantages can be found at
* http://grouper.ieee.org/groups/1722/contributions/2014/1722a-rsilfvast-Diagrams%20for%20Common%20Timing%20Grid%20and%20Presentation%20Time%20(for%20review%20and%20discussion).pdf
* (Look at page 1).
*
* <refsect2>
* <title>Example pipeline</title>
* |[
* gst-launch-1.0 audiotestsrc ! audioconvert ! avtpaafpay ! avtpcrfsync ! avtpsink
* ]| This example pipeline will adjust the timestamps for rawaudio payload.
* Refer to the avtpcrfcheck example to validate the adjusted timestamp.
* </refsect2>
*/
#include <avtp.h>
#include <avtp_aaf.h>
#include <avtp_crf.h>
#include <avtp_cvf.h>
#include <glib.h>
#include <math.h>
#include "gstavtpcrfbase.h"
#include "gstavtpcrfsync.h"
#include "gstavtpcrfutil.h"
GST_DEBUG_CATEGORY_STATIC (avtpcrfsync_debug);
#define GST_CAT_DEFAULT (avtpcrfsync_debug)
#define gst_avtp_crf_sync_parent_class parent_class
G_DEFINE_TYPE (GstAvtpCrfSync, gst_avtp_crf_sync, GST_TYPE_AVTP_CRF_BASE);
static GstFlowReturn gst_avtp_crf_sync_transform_ip (GstBaseTransform * parent,
GstBuffer * buffer);
static void
gst_avtp_crf_sync_class_init (GstAvtpCrfSyncClass * klass)
{
GstElementClass *element_class = GST_ELEMENT_CLASS (klass);
gst_element_class_set_static_metadata (element_class,
"Clock Reference Format (CRF) Synchronizer",
"Filter/Network/AVTP",
"Synchronize Presentation Time from AVTPDUs so they are phase-locked with clock provided by CRF stream",
"Vedang Patel <vedang.patel@intel.com>");
GST_BASE_TRANSFORM_CLASS (klass)->transform_ip =
GST_DEBUG_FUNCPTR (gst_avtp_crf_sync_transform_ip);
GST_DEBUG_CATEGORY_INIT (avtpcrfsync_debug, "avtpcrfsync", 0,
"CRF Synchronizer");
}
static void
gst_avtp_crf_sync_init (GstAvtpCrfSync * avtpcrfsync)
{
/* Nothing to do here. */
}
static void
set_avtp_tstamp (GstAvtpCrfSync * avtpcrfsync, struct avtp_stream_pdu *pdu,
GstClockTime tstamp)
{
int res;
guint32 type;
res =
avtp_pdu_get ((struct avtp_common_pdu *) pdu, AVTP_FIELD_SUBTYPE, &type);
g_assert (res == 0);
switch (type) {
case AVTP_SUBTYPE_AAF:
res = avtp_aaf_pdu_set (pdu, AVTP_AAF_FIELD_TIMESTAMP, tstamp);
g_assert (res == 0);
break;
case AVTP_SUBTYPE_CVF:
res = avtp_cvf_pdu_set (pdu, AVTP_CVF_FIELD_TIMESTAMP, tstamp);
g_assert (res == 0);
break;
default:
GST_ERROR_OBJECT (avtpcrfsync, "type 0x%x not supported.\n", type);
break;
}
}
static void
set_avtp_mr_bit (GstAvtpCrfSync * avtpcrfsync, struct avtp_stream_pdu *pdu,
guint64 mr)
{
int res;
guint32 type;
res =
avtp_pdu_get ((struct avtp_common_pdu *) pdu, AVTP_FIELD_SUBTYPE, &type);
g_assert (res == 0);
switch (type) {
case AVTP_SUBTYPE_AAF:
res = avtp_aaf_pdu_set (pdu, AVTP_AAF_FIELD_MR, mr);
g_assert (res == 0);
break;
case AVTP_SUBTYPE_CVF:
res = avtp_cvf_pdu_set (pdu, AVTP_CVF_FIELD_MR, mr);
g_assert (res == 0);
break;
default:
GST_ERROR_OBJECT (avtpcrfsync, "type 0x%x not supported.\n", type);
break;
}
}
static GstFlowReturn
gst_avtp_crf_sync_transform_ip (GstBaseTransform * parent, GstBuffer * buffer)
{
GstClockTime tstamp, h264_time = 0, adjusted_tstamp, adjusted_h264_time = 0;
GstAvtpCrfBase *avtpcrfbase = GST_AVTP_CRF_BASE (parent);
GstAvtpCrfSync *avtpcrfsync = GST_AVTP_CRF_SYNC (avtpcrfbase);
GstAvtpCrfThreadData *thread_data = &avtpcrfbase->thread_data;
GstClockTime current_ts = thread_data->current_ts;
gdouble avg_period = thread_data->average_period;
struct avtp_stream_pdu *pdu;
gboolean h264_packet;
GstMapInfo info;
gboolean res;
if (!avg_period || !current_ts)
return GST_FLOW_OK;
res = gst_buffer_map (buffer, &info, GST_MAP_READWRITE);
if (!res) {
GST_ELEMENT_ERROR (avtpcrfsync, RESOURCE, OPEN_WRITE,
("cannot access buffer"), (NULL));
return GST_FLOW_ERROR;
}
if (!buffer_size_valid (&info)) {
GST_DEBUG_OBJECT (avtpcrfsync, "Malformed AVTPDU, discarding it");
goto exit;
}
pdu = (struct avtp_stream_pdu *) info.data;
h264_packet = h264_tstamp_valid (pdu);
if (h264_packet) {
res = avtp_cvf_pdu_get (pdu, AVTP_CVF_FIELD_H264_TIMESTAMP, &h264_time);
g_assert (res == 0);
/*
* Extrapolate H264 tstamp to 64 bit and assume it's greater than CRF
* timestamp.
*/
h264_time |= current_ts & 0xFFFFFFFF00000000;
if (h264_time < current_ts)
h264_time += (1ULL << 32);
/*
* float typecasted to guint64 truncates the decimal part. So, round() it
* before casting.
*/
adjusted_h264_time =
(GstClockTime) roundl (current_ts + ceill (((gdouble) h264_time -
current_ts) / avg_period) * avg_period);
res =
avtp_cvf_pdu_set (pdu, AVTP_CVF_FIELD_H264_TIMESTAMP,
adjusted_h264_time);
g_assert (res == 0);
GST_LOG_OBJECT (avtpcrfsync,
"Adjust H264 timestamp in CVF packet. tstamp: %" G_GUINT64_FORMAT
" adjusted_tstamp: %" G_GUINT64_FORMAT,
h264_time & 0xFFFFFFFF, adjusted_h264_time & 0xFFFFFFFF);
}
tstamp = get_avtp_tstamp (avtpcrfbase, pdu);
if (tstamp == GST_CLOCK_TIME_NONE)
goto exit;
/*
* Extrapolate the 32-bit AVTP Timestamp to 64-bit and assume it's greater
* than the 64-bit CRF timestamp.
*/
tstamp |= current_ts & 0xFFFFFFFF00000000;
if (tstamp < current_ts)
tstamp += (1ULL << 32);
/*
* float typecasted to guint64 truncates the decimal part. So, round() it
* before casting.
*/
adjusted_tstamp =
(GstClockTime) roundl (current_ts + ceill ((tstamp -
current_ts) / avg_period) * avg_period);
set_avtp_tstamp (avtpcrfsync, pdu, adjusted_tstamp);
set_avtp_mr_bit (avtpcrfsync, pdu, thread_data->mr);
GST_LOG_OBJECT (avtpcrfsync,
"Adjust AVTP timestamp. tstamp: %" G_GUINT64_FORMAT
" Adjusted tstamp: %" G_GUINT64_FORMAT,
tstamp & 0xFFFFFFFF, adjusted_tstamp & 0xFFFFFFFF);
/*
* Since we adjusted the AVTP/H264 presentation times in the AVTPDU, we also
* need to adjust buffer times by the same amount so that the buffer is
* transmitted at the right time.
*/
if (h264_packet) {
if (GST_BUFFER_DTS (buffer) != GST_CLOCK_TIME_NONE)
GST_BUFFER_DTS (buffer) += adjusted_tstamp - tstamp;
GST_BUFFER_PTS (buffer) += adjusted_h264_time - h264_time;
} else {
GST_BUFFER_PTS (buffer) += adjusted_tstamp - tstamp;
}
exit:
gst_buffer_unmap (buffer, &info);
return GST_FLOW_OK;
}
gboolean
gst_avtp_crf_sync_plugin_init (GstPlugin * plugin)
{
return gst_element_register (plugin, "avtpcrfsync", GST_RANK_NONE,
GST_TYPE_AVTP_CRF_SYNC);
}

56
ext/avtp/gstavtpcrfsync.h Normal file
View file

@ -0,0 +1,56 @@
/*
* GStreamer AVTP Plugin
* Copyright (C) 2019 Intel Corporation
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
#ifndef __GST_AVTP_CRF_SYNC_H__
#define __GST_AVTP_CRF_SYNC_H__
#include <gst/gst.h>
#include "gstavtpcrfbase.h"
G_BEGIN_DECLS
#define GST_TYPE_AVTP_CRF_SYNC (gst_avtp_crf_sync_get_type())
#define GST_AVTP_CRF_SYNC(obj) \
(G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_AVTP_CRF_SYNC,GstAvtpCrfSync))
#define GST_AVTP_CRF_SYNC_CLASS(klass) \
(G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_AVTP_CRF_SYNC,GstAvtpCrfSyncClass))
#define GST_IS_AVTP_CRF_SYNC(obj) \
(G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_AVTP_CRF_SYNC))
#define GST_IS_AVTP_CRF_SYNC_CLASS(klass) \
(G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_AVTP_CRF_SYNC))
typedef struct _GstAvtpCrfSync GstAvtpCrfSync;
typedef struct _GstAvtpCrfSyncClass GstAvtpCrfSyncClass;
struct _GstAvtpCrfSync
{
GstAvtpCrfBase avtpcrfbase;
};
struct _GstAvtpCrfSyncClass
{
GstAvtpCrfBaseClass parent_class;
};
GType gst_avtp_crf_sync_get_type (void);
gboolean gst_avtp_crf_sync_plugin_init (GstPlugin * plugin);
G_END_DECLS
#endif /* __GST_AVTP_CRF_SYNC_H__ */

117
ext/avtp/gstavtpcrfutil.c Normal file
View file

@ -0,0 +1,117 @@
/*
* GStreamer AVTP Plugin
* Copyright (C) 2019 Intel Corporation
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
#include <avtp.h>
#include <avtp_aaf.h>
#include <avtp_cvf.h>
#include <glib.h>
#include "gstavtpcrfutil.h"
#define AVTP_CVF_H264_HEADER_SIZE (sizeof(struct avtp_stream_pdu) + sizeof(guint32))
gboolean
buffer_size_valid (GstMapInfo * info)
{
struct avtp_stream_pdu *pdu;
guint64 subtype;
guint32 type;
int res;
if (info->size < sizeof (struct avtp_stream_pdu))
return FALSE;
pdu = (struct avtp_stream_pdu *) info->data;
res =
avtp_pdu_get ((struct avtp_common_pdu *) pdu, AVTP_FIELD_SUBTYPE, &type);
g_assert (res == 0);
res = avtp_cvf_pdu_get (pdu, AVTP_CVF_FIELD_FORMAT_SUBTYPE, &subtype);
g_assert (res == 0);
if (type == AVTP_SUBTYPE_CVF && subtype == AVTP_CVF_FORMAT_SUBTYPE_H264
&& info->size < AVTP_CVF_H264_HEADER_SIZE)
return FALSE;
return TRUE;
}
GstClockTime
get_avtp_tstamp (GstAvtpCrfBase * avtpcrfbase, struct avtp_stream_pdu * pdu)
{
guint64 tstamp = GST_CLOCK_TIME_NONE, tstamp_valid;
guint32 type;
int res;
res =
avtp_pdu_get ((struct avtp_common_pdu *) pdu, AVTP_FIELD_SUBTYPE, &type);
g_assert (res == 0);
switch (type) {
case AVTP_SUBTYPE_AAF:
res = avtp_aaf_pdu_get (pdu, AVTP_AAF_FIELD_TV, &tstamp_valid);
g_assert (res == 0);
if (!tstamp_valid)
break;
res = avtp_aaf_pdu_get (pdu, AVTP_AAF_FIELD_TIMESTAMP, &tstamp);
g_assert (res == 0);
break;
case AVTP_SUBTYPE_CVF:
res = avtp_cvf_pdu_get (pdu, AVTP_CVF_FIELD_TV, &tstamp_valid);
g_assert (res == 0);
if (!tstamp_valid)
break;
res = avtp_cvf_pdu_get (pdu, AVTP_CVF_FIELD_TIMESTAMP, &tstamp);
g_assert (res == 0);
break;
default:
GST_INFO_OBJECT (avtpcrfbase, "type 0x%x not supported.\n", type);
break;
}
return (GstClockTime) tstamp;
}
gboolean
h264_tstamp_valid (struct avtp_stream_pdu * pdu)
{
guint64 subtype, h264_time_valid;
guint32 type;
int res;
/*
* Validate H264 timestamp for H264 format. For more details about the
* timestamp look at IEEE 1722-2016 Section 8.5.3.1
*/
res =
avtp_pdu_get ((struct avtp_common_pdu *) pdu, AVTP_FIELD_SUBTYPE, &type);
g_assert (res == 0);
if (type == AVTP_SUBTYPE_CVF) {
res = avtp_cvf_pdu_get (pdu, AVTP_CVF_FIELD_FORMAT_SUBTYPE, &subtype);
g_assert (res == 0);
res = avtp_cvf_pdu_get (pdu, AVTP_CVF_FIELD_H264_PTV, &h264_time_valid);
g_assert (res == 0);
if (subtype == AVTP_CVF_FORMAT_SUBTYPE_H264 && h264_time_valid)
return TRUE;
}
return FALSE;
}

32
ext/avtp/gstavtpcrfutil.h Normal file
View file

@ -0,0 +1,32 @@
/*
* GStreamer AVTP Plugin
* Copyright (C) 2019 Intel Corporation
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
#ifndef __GST_AVTP_CRF_UTILS_H__
#define __GST_AVTP_CRF_UTILS_H__
#include <avtp.h>
#include "gstavtpcrfbase.h"
gboolean buffer_size_valid (GstMapInfo * info);
GstClockTime get_avtp_tstamp (GstAvtpCrfBase * avtpcrfbase,
struct avtp_stream_pdu * pdu);
gboolean h264_tstamp_valid (struct avtp_stream_pdu * pdu);
#endif /* __GST_AVTP_CRF_UTILS_H__ */

View file

@ -8,6 +8,9 @@ avtp_sources = [
'gstavtpbasepayload.c',
'gstavtpsink.c',
'gstavtpsrc.c',
'gstavtpcrfutil.c',
'gstavtpcrfbase.c',
'gstavtpcrfsync.c',
]
avtp_dep = dependency('avtp', required: get_option('avtp'))
@ -17,7 +20,7 @@ if avtp_dep.found() and cc.has_type('struct sock_txtime', prefix : '#include <li
avtp_sources,
c_args : gst_plugins_bad_args,
include_directories : [configinc],
dependencies : [gstaudio_dep, gstvideo_dep, avtp_dep],
dependencies : [gstaudio_dep, gstvideo_dep, avtp_dep, libm],
install : true,
install_dir : plugins_install_dir,
)