diff --git a/ext/srt/gstsrtobject.c b/ext/srt/gstsrtobject.c index 3cdd1c1aa4..6a0bd5a790 100644 --- a/ext/srt/gstsrtobject.c +++ b/ext/srt/gstsrtobject.c @@ -1247,7 +1247,8 @@ gst_srt_object_wait_caller (GstSRTObject * srtobject, gssize gst_srt_object_read (GstSRTObject * srtobject, - guint8 * data, gsize size, GCancellable * cancellable, GError ** error) + guint8 * data, gsize size, GCancellable * cancellable, GError ** error, + SRT_MSGCTRL * mctrl) { gssize len = 0; gint poll_timeout; @@ -1335,7 +1336,8 @@ gst_srt_object_read (GstSRTObject * srtobject, } - len = srt_recvmsg (rsock, (char *) (data), size); + srt_msgctrl_init (mctrl); + len = srt_recvmsg2 (rsock, (char *) (data), size, mctrl); if (len == SRT_ERROR) { gint srt_errno = srt_getlasterror (NULL); diff --git a/ext/srt/gstsrtobject.h b/ext/srt/gstsrtobject.h index 302aa89cc8..09950feaf2 100644 --- a/ext/srt/gstsrtobject.h +++ b/ext/srt/gstsrtobject.h @@ -98,7 +98,8 @@ gboolean gst_srt_object_set_uri (GstSRTObject * srtobject, const gchar *u gssize gst_srt_object_read (GstSRTObject * srtobject, guint8 *data, gsize size, GCancellable *cancellable, - GError **err); + GError **err, + SRT_MSGCTRL *mctrl); gssize gst_srt_object_write (GstSRTObject * srtobject, GstBufferList * headers, diff --git a/ext/srt/gstsrtsrc.c b/ext/srt/gstsrtsrc.c index 2b898ea79a..c886e39f32 100644 --- a/ext/srt/gstsrtsrc.c +++ b/ext/srt/gstsrtsrc.c @@ -97,6 +97,9 @@ gst_srt_src_start (GstBaseSrc * bsrc) g_clear_error (&error); } + /* Reset expected pktseq */ + self->next_pktseq = 0; + return ret; } @@ -118,6 +121,11 @@ gst_srt_src_fill (GstPushSrc * src, GstBuffer * outbuf) GstMapInfo info; GError *err = NULL; gssize recv_len; + GstClock *clock; + GstClockTime base_time; + GstClockTime capture_time; + GstClockTime delay; + SRT_MSGCTRL mctrl; if (g_cancellable_is_cancelled (self->cancellable)) { ret = GST_FLOW_FLUSHING; @@ -130,11 +138,30 @@ gst_srt_src_fill (GstPushSrc * src, GstBuffer * outbuf) goto out; } + /* Get clock and values */ + clock = gst_element_get_clock (GST_ELEMENT (src)); + base_time = gst_element_get_base_time (GST_ELEMENT (src)); + recv_len = gst_srt_object_read (self->srtobject, info.data, - gst_buffer_get_size (outbuf), self->cancellable, &err); + gst_buffer_get_size (outbuf), self->cancellable, &err, &mctrl); + + /* Capture clock values ASAP */ + capture_time = gst_clock_get_time (clock); +#if SRT_VERSION_VALUE >= 0x10402 + /* Use SRT clock value if available (SRT > 1.4.2) */ + delay = (srt_time_now () - mctrl.srctime) * GST_USECOND; +#else + /* Else use the unix epoch monotonic clock */ + delay = (g_get_real_time () - mctrl.srctime) * GST_USECOND; +#endif + gst_object_unref (clock); gst_buffer_unmap (outbuf, &info); + GST_LOG_OBJECT (src, + "recv_len:%" G_GSIZE_FORMAT " pktseq:%d msgno:%d srctime:%" + G_GUINT64_FORMAT, recv_len, mctrl.pktseq, mctrl.msgno, mctrl.srctime); + if (g_cancellable_is_cancelled (self->cancellable)) { ret = GST_FLOW_FLUSHING; goto out; @@ -150,6 +177,29 @@ gst_srt_src_fill (GstPushSrc * src, GstBuffer * outbuf) goto out; } + /* Detect discontinuities */ + if (mctrl.pktseq != self->next_pktseq) { + GST_WARNING_OBJECT (src, "discont detected %d (expected: %d)", + mctrl.pktseq, self->next_pktseq); + GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_DISCONT); + } + /* pktseq is a 31bit field */ + self->next_pktseq = (mctrl.pktseq + 1) % G_MAXINT32; + + /* Subtract the base_time (since the pipeline started) ... */ + if (capture_time > base_time) + capture_time -= base_time; + else + capture_time = 0; + /* And adjust by the delay */ + if (capture_time > delay) + capture_time -= delay; + else + capture_time = 0; + GST_BUFFER_TIMESTAMP (outbuf) = capture_time; + + GST_DEBUG_OBJECT (src, "delay:%" GST_TIME_FORMAT, GST_TIME_ARGS (delay)); + gst_buffer_resize (outbuf, 0, recv_len); GST_LOG_OBJECT (src, @@ -173,7 +223,8 @@ gst_srt_src_init (GstSRTSrc * self) gst_base_src_set_format (GST_BASE_SRC (self), GST_FORMAT_TIME); gst_base_src_set_live (GST_BASE_SRC (self), TRUE); - gst_base_src_set_do_timestamp (GST_BASE_SRC (self), TRUE); + /* We do the timing ourselves */ + gst_base_src_set_do_timestamp (GST_BASE_SRC (self), FALSE); gst_srt_object_set_uri (self->srtobject, GST_SRT_DEFAULT_URI, NULL); @@ -234,6 +285,24 @@ gst_srt_src_get_property (GObject * object, } } +static gboolean +gst_srt_src_query (GstBaseSrc * basesrc, GstQuery * query) +{ + GstSRTSrc *self = GST_SRT_SRC (basesrc); + + if (GST_QUERY_TYPE (query) == GST_QUERY_LATENCY) { + gint latency; + if (!gst_structure_get_int (self->srtobject->parameters, "latency", + &latency)) + latency = GST_SRT_DEFAULT_LATENCY; + gst_query_set_latency (query, TRUE, latency * GST_MSECOND, + latency * GST_MSECOND); + return TRUE; + } else { + return GST_BASE_SRC_CLASS (parent_class)->query (basesrc, query); + } +} + static void gst_srt_src_class_init (GstSRTSrcClass * klass) { @@ -285,6 +354,7 @@ gst_srt_src_class_init (GstSRTSrcClass * klass) gstbasesrc_class->stop = GST_DEBUG_FUNCPTR (gst_srt_src_stop); gstbasesrc_class->unlock = GST_DEBUG_FUNCPTR (gst_srt_src_unlock); gstbasesrc_class->unlock_stop = GST_DEBUG_FUNCPTR (gst_srt_src_unlock_stop); + gstbasesrc_class->query = GST_DEBUG_FUNCPTR (gst_srt_src_query); gstpushsrc_class->fill = GST_DEBUG_FUNCPTR (gst_srt_src_fill); } diff --git a/ext/srt/gstsrtsrc.h b/ext/srt/gstsrtsrc.h index af0b83362e..0ca9a4b84a 100644 --- a/ext/srt/gstsrtsrc.h +++ b/ext/srt/gstsrtsrc.h @@ -49,6 +49,8 @@ struct _GstSRTSrc { GstSRTObject *srtobject; GCancellable *cancellable; + + guint32 next_pktseq; }; struct _GstSRTSrcClass {