srtsrc: Fix timestamping

SRT provides the original timestamp of a packet (with drift/skew corrected for
local clock), which is what should be used for timestamping the outgoing
buffers. This ensures that we output the packets with the same timestamp (and by
extension rate) as the original feed.

Also detect if packets were dropped (by checking the sequence number) and
properly set DISCONT flag on the outgoing buffer.

Finally answer the latency queries

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-bad/-/merge_requests/1658>
This commit is contained in:
Edward Hervey 2020-10-06 11:45:36 +02:00 committed by GStreamer Merge Bot
parent 20d9283e3d
commit dd11e91c3b
4 changed files with 80 additions and 5 deletions

View file

@ -1247,7 +1247,8 @@ gst_srt_object_wait_caller (GstSRTObject * srtobject,
gssize
gst_srt_object_read (GstSRTObject * srtobject,
guint8 * data, gsize size, GCancellable * cancellable, GError ** error)
guint8 * data, gsize size, GCancellable * cancellable, GError ** error,
SRT_MSGCTRL * mctrl)
{
gssize len = 0;
gint poll_timeout;
@ -1335,7 +1336,8 @@ gst_srt_object_read (GstSRTObject * srtobject,
}
len = srt_recvmsg (rsock, (char *) (data), size);
srt_msgctrl_init (mctrl);
len = srt_recvmsg2 (rsock, (char *) (data), size, mctrl);
if (len == SRT_ERROR) {
gint srt_errno = srt_getlasterror (NULL);

View file

@ -98,7 +98,8 @@ gboolean gst_srt_object_set_uri (GstSRTObject * srtobject, const gchar *u
gssize gst_srt_object_read (GstSRTObject * srtobject,
guint8 *data, gsize size,
GCancellable *cancellable,
GError **err);
GError **err,
SRT_MSGCTRL *mctrl);
gssize gst_srt_object_write (GstSRTObject * srtobject,
GstBufferList * headers,

View file

@ -97,6 +97,9 @@ gst_srt_src_start (GstBaseSrc * bsrc)
g_clear_error (&error);
}
/* Reset expected pktseq */
self->next_pktseq = 0;
return ret;
}
@ -118,6 +121,11 @@ gst_srt_src_fill (GstPushSrc * src, GstBuffer * outbuf)
GstMapInfo info;
GError *err = NULL;
gssize recv_len;
GstClock *clock;
GstClockTime base_time;
GstClockTime capture_time;
GstClockTime delay;
SRT_MSGCTRL mctrl;
if (g_cancellable_is_cancelled (self->cancellable)) {
ret = GST_FLOW_FLUSHING;
@ -130,11 +138,30 @@ gst_srt_src_fill (GstPushSrc * src, GstBuffer * outbuf)
goto out;
}
/* Get clock and values */
clock = gst_element_get_clock (GST_ELEMENT (src));
base_time = gst_element_get_base_time (GST_ELEMENT (src));
recv_len = gst_srt_object_read (self->srtobject, info.data,
gst_buffer_get_size (outbuf), self->cancellable, &err);
gst_buffer_get_size (outbuf), self->cancellable, &err, &mctrl);
/* Capture clock values ASAP */
capture_time = gst_clock_get_time (clock);
#if SRT_VERSION_VALUE >= 0x10402
/* Use SRT clock value if available (SRT > 1.4.2) */
delay = (srt_time_now () - mctrl.srctime) * GST_USECOND;
#else
/* Else use the unix epoch monotonic clock */
delay = (g_get_real_time () - mctrl.srctime) * GST_USECOND;
#endif
gst_object_unref (clock);
gst_buffer_unmap (outbuf, &info);
GST_LOG_OBJECT (src,
"recv_len:%" G_GSIZE_FORMAT " pktseq:%d msgno:%d srctime:%"
G_GUINT64_FORMAT, recv_len, mctrl.pktseq, mctrl.msgno, mctrl.srctime);
if (g_cancellable_is_cancelled (self->cancellable)) {
ret = GST_FLOW_FLUSHING;
goto out;
@ -150,6 +177,29 @@ gst_srt_src_fill (GstPushSrc * src, GstBuffer * outbuf)
goto out;
}
/* Detect discontinuities */
if (mctrl.pktseq != self->next_pktseq) {
GST_WARNING_OBJECT (src, "discont detected %d (expected: %d)",
mctrl.pktseq, self->next_pktseq);
GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_DISCONT);
}
/* pktseq is a 31bit field */
self->next_pktseq = (mctrl.pktseq + 1) % G_MAXINT32;
/* Subtract the base_time (since the pipeline started) ... */
if (capture_time > base_time)
capture_time -= base_time;
else
capture_time = 0;
/* And adjust by the delay */
if (capture_time > delay)
capture_time -= delay;
else
capture_time = 0;
GST_BUFFER_TIMESTAMP (outbuf) = capture_time;
GST_DEBUG_OBJECT (src, "delay:%" GST_TIME_FORMAT, GST_TIME_ARGS (delay));
gst_buffer_resize (outbuf, 0, recv_len);
GST_LOG_OBJECT (src,
@ -173,7 +223,8 @@ gst_srt_src_init (GstSRTSrc * self)
gst_base_src_set_format (GST_BASE_SRC (self), GST_FORMAT_TIME);
gst_base_src_set_live (GST_BASE_SRC (self), TRUE);
gst_base_src_set_do_timestamp (GST_BASE_SRC (self), TRUE);
/* We do the timing ourselves */
gst_base_src_set_do_timestamp (GST_BASE_SRC (self), FALSE);
gst_srt_object_set_uri (self->srtobject, GST_SRT_DEFAULT_URI, NULL);
@ -234,6 +285,24 @@ gst_srt_src_get_property (GObject * object,
}
}
static gboolean
gst_srt_src_query (GstBaseSrc * basesrc, GstQuery * query)
{
GstSRTSrc *self = GST_SRT_SRC (basesrc);
if (GST_QUERY_TYPE (query) == GST_QUERY_LATENCY) {
gint latency;
if (!gst_structure_get_int (self->srtobject->parameters, "latency",
&latency))
latency = GST_SRT_DEFAULT_LATENCY;
gst_query_set_latency (query, TRUE, latency * GST_MSECOND,
latency * GST_MSECOND);
return TRUE;
} else {
return GST_BASE_SRC_CLASS (parent_class)->query (basesrc, query);
}
}
static void
gst_srt_src_class_init (GstSRTSrcClass * klass)
{
@ -285,6 +354,7 @@ gst_srt_src_class_init (GstSRTSrcClass * klass)
gstbasesrc_class->stop = GST_DEBUG_FUNCPTR (gst_srt_src_stop);
gstbasesrc_class->unlock = GST_DEBUG_FUNCPTR (gst_srt_src_unlock);
gstbasesrc_class->unlock_stop = GST_DEBUG_FUNCPTR (gst_srt_src_unlock_stop);
gstbasesrc_class->query = GST_DEBUG_FUNCPTR (gst_srt_src_query);
gstpushsrc_class->fill = GST_DEBUG_FUNCPTR (gst_srt_src_fill);
}

View file

@ -49,6 +49,8 @@ struct _GstSRTSrc {
GstSRTObject *srtobject;
GCancellable *cancellable;
guint32 next_pktseq;
};
struct _GstSRTSrcClass {