gstreamer/subprojects/gst-rtsp-server/examples/test-onvif-server.c
Branko Subasic 2689277a6b rtponviftimestamp: add support for using reference timestamps
Make it posible to configure the element to obtain the timestamps from
reference timestamp meta data instead of using the ntp-offset property,
or estimating its own offset. Currently the only time format supported
is "timestamp/x-unix", i.e. UTC time expressed in the unix time epoch.

In addition the custom event GstNtpOffset has been renamed to
GstOnvifTimestamp, to reflect that it is not necessarily used to convey
the ntp-offset. As a consequence we had to modify a couple of files in
the rtsp-server as well.

Fixes 

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/1683>
2022-03-11 08:39:50 +00:00

654 lines
17 KiB
C

/* GStreamer
* Copyright (C) 2019 Mathieu Duponchelle <mathieu@centricular.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
#include <gst/gst.h>
#include <gst/rtsp-server/rtsp-server.h>
#include "test-onvif-server.h"
GST_DEBUG_CATEGORY_STATIC (onvif_server_debug);
#define GST_CAT_DEFAULT (onvif_server_debug)
#define MAKE_AND_ADD(var, pipe, name, label, elem_name) \
G_STMT_START { \
if (G_UNLIKELY (!(var = (gst_element_factory_make (name, elem_name))))) { \
GST_ERROR ("Could not create element %s", name); \
goto label; \
} \
if (G_UNLIKELY (!gst_bin_add (GST_BIN_CAST (pipe), var))) { \
GST_ERROR ("Could not add element %s", name); \
goto label; \
} \
} G_STMT_END
/* This simulates an archive of recordings running from 01-01-1900 to 01-01-2000.
*
* This is implemented by repeating the file provided at the command line, with
* an empty interval of 5 seconds in-between. We intercept relevant events to
* translate them, and update the timestamps on the output buffers.
*/
#define INTERVAL (5 * GST_SECOND)
/* January the first, 2000 */
#define END_DATE 3155673600 * GST_SECOND
static gchar *filename;
struct _ReplayBin
{
GstBin parent;
GstEvent *incoming_seek;
GstEvent *outgoing_seek;
GstClockTime trickmode_interval;
GstSegment segment;
const GstSegment *incoming_segment;
gboolean sent_segment;
GstClockTime ts_offset;
gint64 remainder;
GstClockTime min_pts;
};
G_DEFINE_TYPE (ReplayBin, replay_bin, GST_TYPE_BIN);
static void
replay_bin_init (ReplayBin * self)
{
self->incoming_seek = NULL;
self->outgoing_seek = NULL;
self->trickmode_interval = 0;
self->ts_offset = 0;
self->sent_segment = FALSE;
self->min_pts = GST_CLOCK_TIME_NONE;
}
static void
replay_bin_class_init (ReplayBinClass * klass)
{
}
static GstElement *
replay_bin_new (void)
{
return GST_ELEMENT (g_object_new (replay_bin_get_type (), NULL));
}
static void
demux_pad_added_cb (GstElement * demux, GstPad * pad, GstGhostPad * ghost)
{
GstCaps *caps = gst_pad_get_current_caps (pad);
GstStructure *s = gst_caps_get_structure (caps, 0);
if (gst_structure_has_name (s, "video/x-h264")) {
gst_ghost_pad_set_target (ghost, pad);
}
gst_caps_unref (caps);
}
static void
query_seekable (GstPad * ghost, gint64 * start, gint64 * stop)
{
GstPad *target;
GstQuery *query;
GstFormat format;
gboolean seekable;
target = gst_ghost_pad_get_target (GST_GHOST_PAD (ghost));
query = gst_query_new_seeking (GST_FORMAT_TIME);
gst_pad_query (target, query);
gst_query_parse_seeking (query, &format, &seekable, start, stop);
g_assert (seekable);
gst_object_unref (target);
}
static GstEvent *
translate_seek (ReplayBin * self, GstPad * pad, GstEvent * ievent)
{
GstEvent *oevent = NULL;
gdouble rate;
GstFormat format;
GstSeekFlags flags;
GstSeekType start_type, stop_type;
gint64 start, stop;
gint64 istart, istop; /* Incoming */
gint64 ustart, ustop; /* Upstream */
gint64 ostart, ostop; /* Outgoing */
guint32 seqnum = gst_event_get_seqnum (ievent);
gst_event_parse_seek (ievent, &rate, &format, &flags, &start_type, &start,
&stop_type, &stop);
if (!GST_CLOCK_TIME_IS_VALID (stop))
stop = END_DATE;
gst_event_parse_seek_trickmode_interval (ievent, &self->trickmode_interval);
istart = start;
istop = stop;
query_seekable (pad, &ustart, &ustop);
if (rate > 0) {
/* First, from where we should seek the file */
ostart = istart % (ustop + INTERVAL);
/* This may end up in our empty interval */
if (ostart > ustop) {
istart += ostart - ustop;
ostart = 0;
}
/* Then, up to where we should seek it */
ostop = MIN (ustop, ostart + (istop - istart));
} else {
/* First up to where we should seek the file */
ostop = istop % (ustop + INTERVAL);
/* This may end up in our empty interval */
if (ostop > ustop) {
istop -= ostop - ustop;
ostop = ustop;
}
ostart = MAX (0, ostop - (istop - istart));
}
/* We may be left with nothing to actually play, in this
* case we won't seek upstream, and emit the expected events
* ourselves */
if (istart > istop) {
GstSegment segment;
GstEvent *event;
gboolean update;
event = gst_event_new_flush_start ();
gst_event_set_seqnum (event, seqnum);
gst_pad_push_event (pad, event);
event = gst_event_new_flush_stop (TRUE);
gst_event_set_seqnum (event, seqnum);
gst_pad_push_event (pad, event);
gst_segment_init (&segment, format);
gst_segment_do_seek (&segment, rate, format, flags, start_type, start,
stop_type, stop, &update);
event = gst_event_new_segment (&segment);
gst_event_set_seqnum (event, seqnum);
gst_pad_push_event (pad, event);
event = gst_event_new_eos ();
gst_event_set_seqnum (event, seqnum);
gst_pad_push_event (pad, event);
goto done;
}
/* Lastly, how much will remain to play back (this remainder includes the interval) */
if (stop - start > ostop - ostart)
self->remainder = (stop - start) - (ostop - ostart);
flags |= GST_SEEK_FLAG_SEGMENT;
oevent =
gst_event_new_seek (rate, format, flags, start_type, ostart, stop_type,
ostop);
gst_event_set_seek_trickmode_interval (oevent, self->trickmode_interval);
gst_event_set_seqnum (oevent, seqnum);
GST_DEBUG ("Translated event to %" GST_PTR_FORMAT
" (remainder: %" G_GINT64_FORMAT ")", oevent, self->remainder);
done:
return oevent;
}
static gboolean
replay_bin_event_func (GstPad * pad, GstObject * parent, GstEvent * event)
{
ReplayBin *self = REPLAY_BIN (parent);
gboolean ret = TRUE;
gboolean forward = TRUE;
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_SEEK:
{
GST_DEBUG ("Processing seek event %" GST_PTR_FORMAT, event);
self->incoming_seek = event;
gst_event_replace (&self->outgoing_seek, NULL);
self->sent_segment = FALSE;
event = translate_seek (self, pad, event);
if (!event)
forward = FALSE;
else
self->outgoing_seek = gst_event_ref (event);
break;
}
default:
break;
}
if (forward)
return gst_pad_event_default (pad, parent, event);
else
return ret;
}
static gboolean
replay_bin_query_func (GstPad * pad, GstObject * parent, GstQuery * query)
{
ReplayBin *self = REPLAY_BIN (parent);
gboolean ret = TRUE;
gboolean forward = TRUE;
switch (GST_QUERY_TYPE (query)) {
case GST_QUERY_SEEKING:
/* We are seekable from the beginning till the end of time */
gst_query_set_seeking (query, GST_FORMAT_TIME, TRUE, 0,
GST_CLOCK_TIME_NONE);
forward = FALSE;
break;
case GST_QUERY_SEGMENT:
gst_query_set_segment (query, self->segment.rate, self->segment.format,
self->segment.start, self->segment.stop);
forward = FALSE;
default:
break;
}
GST_DEBUG ("Processed query %" GST_PTR_FORMAT, query);
if (forward)
return gst_pad_query_default (pad, parent, query);
else
return ret;
}
static GstEvent *
translate_segment (GstPad * pad, GstEvent * ievent)
{
ReplayBin *self = REPLAY_BIN (GST_OBJECT_PARENT (pad));
GstEvent *ret;
gdouble irate, orate;
GstFormat iformat, oformat;
GstSeekFlags iflags, oflags;
GstSeekType istart_type, ostart_type, istop_type, ostop_type;
gint64 istart, ostart, istop, ostop;
gboolean update;
gst_event_parse_segment (ievent, &self->incoming_segment);
if (!self->outgoing_seek) {
GstSegment segment;
gboolean update;
gst_segment_init (&segment, GST_FORMAT_TIME);
gst_segment_do_seek (&segment, 1.0, GST_FORMAT_TIME, 0, GST_SEEK_TYPE_SET,
0, GST_SEEK_TYPE_SET, END_DATE, &update);
ret = gst_event_new_segment (&segment);
gst_event_unref (ievent);
goto done;
}
if (!self->sent_segment) {
gst_event_parse_seek (self->incoming_seek, &irate, &iformat, &iflags,
&istart_type, &istart, &istop_type, &istop);
gst_event_parse_seek (self->outgoing_seek, &orate, &oformat, &oflags,
&ostart_type, &ostart, &ostop_type, &ostop);
if (istop == -1)
istop = END_DATE;
if (self->incoming_segment->rate > 0)
self->ts_offset = istart - ostart;
else
self->ts_offset = istop - ostop;
istart += self->incoming_segment->start - ostart;
istop += self->incoming_segment->stop - ostop;
gst_segment_init (&self->segment, self->incoming_segment->format);
gst_segment_do_seek (&self->segment, self->incoming_segment->rate,
self->incoming_segment->format,
(GstSeekFlags) self->incoming_segment->flags, GST_SEEK_TYPE_SET,
(guint64) istart, GST_SEEK_TYPE_SET, (guint64) istop, &update);
self->min_pts = istart;
ret = gst_event_new_segment (&self->segment);
self->sent_segment = TRUE;
gst_event_unref (ievent);
GST_DEBUG ("Translated segment: %" GST_PTR_FORMAT ", "
"ts_offset: %" G_GUINT64_FORMAT, ret, self->ts_offset);
} else {
ret = NULL;
}
done:
return ret;
}
static void
handle_segment_done (ReplayBin * self, GstPad * pad)
{
GstEvent *event;
if (self->remainder < INTERVAL) {
self->remainder = 0;
event = gst_event_new_eos ();
gst_event_set_seqnum (event, gst_event_get_seqnum (self->incoming_seek));
gst_pad_push_event (pad, event);
} else {
gint64 ustart, ustop;
gint64 ostart, ostop;
GstPad *target;
GstStructure *s;
/* Signify the end of a contiguous section of recording */
s = gst_structure_new ("GstOnvifTimestamp",
"ntp-offset", G_TYPE_UINT64, 0, "discont", G_TYPE_BOOLEAN, TRUE, NULL);
event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM, s);
gst_pad_push_event (pad, event);
query_seekable (pad, &ustart, &ustop);
self->remainder -= INTERVAL;
if (self->incoming_segment->rate > 0) {
ostart = 0;
ostop = MIN (ustop, self->remainder);
} else {
ostart = MAX (ustop - self->remainder, 0);
ostop = ustop;
}
self->remainder = MAX (self->remainder - ostop - ostart, 0);
event =
gst_event_new_seek (self->segment.rate, self->segment.format,
self->segment.flags & ~GST_SEEK_FLAG_FLUSH, GST_SEEK_TYPE_SET, ostart,
GST_SEEK_TYPE_SET, ostop);
gst_event_set_seek_trickmode_interval (event, self->trickmode_interval);
if (self->incoming_segment->rate > 0)
self->ts_offset += INTERVAL + ustop;
else
self->ts_offset -= INTERVAL + ustop;
GST_DEBUG ("New offset: %" GST_TIME_FORMAT,
GST_TIME_ARGS (self->ts_offset));
GST_DEBUG ("Seeking to %" GST_PTR_FORMAT, event);
target = gst_ghost_pad_get_target (GST_GHOST_PAD (pad));
gst_pad_send_event (target, event);
gst_object_unref (target);
}
}
static GstPadProbeReturn
replay_bin_event_probe (GstPad * pad, GstPadProbeInfo * info, gpointer unused)
{
ReplayBin *self = REPLAY_BIN (GST_OBJECT_PARENT (pad));
GstPadProbeReturn ret = GST_PAD_PROBE_OK;
GST_DEBUG ("Probed %" GST_PTR_FORMAT, info->data);
switch (GST_EVENT_TYPE (info->data)) {
case GST_EVENT_SEGMENT:
{
GstEvent *translated;
GST_DEBUG ("Probed segment %" GST_PTR_FORMAT, info->data);
translated = translate_segment (pad, GST_EVENT (info->data));
if (translated)
info->data = translated;
else
ret = GST_PAD_PROBE_HANDLED;
break;
}
case GST_EVENT_SEGMENT_DONE:
{
handle_segment_done (self, pad);
ret = GST_PAD_PROBE_HANDLED;
break;
}
default:
break;
}
return ret;
}
static GstPadProbeReturn
replay_bin_buffer_probe (GstPad * pad, GstPadProbeInfo * info, gpointer unused)
{
ReplayBin *self = REPLAY_BIN (GST_OBJECT_PARENT (pad));
GstPadProbeReturn ret = GST_PAD_PROBE_OK;
if (GST_BUFFER_PTS (info->data) > self->incoming_segment->stop) {
ret = GST_PAD_PROBE_DROP;
goto done;
}
if (GST_CLOCK_TIME_IS_VALID (GST_BUFFER_PTS (info->data)))
GST_BUFFER_PTS (info->data) += self->ts_offset;
if (GST_CLOCK_TIME_IS_VALID (GST_BUFFER_DTS (info->data)))
GST_BUFFER_DTS (info->data) += self->ts_offset;
GST_LOG ("Pushing buffer %" GST_PTR_FORMAT, info->data);
done:
return ret;
}
static GstElement *
create_replay_bin (GstElement * parent)
{
GstElement *ret, *src, *demux;
GstPad *ghost;
ret = replay_bin_new ();
if (!gst_bin_add (GST_BIN (parent), ret)) {
gst_object_unref (ret);
goto fail;
}
MAKE_AND_ADD (src, ret, "filesrc", fail, NULL);
MAKE_AND_ADD (demux, ret, "qtdemux", fail, NULL);
ghost = gst_ghost_pad_new_no_target ("src", GST_PAD_SRC);
gst_element_add_pad (ret, ghost);
gst_pad_set_event_function (ghost, replay_bin_event_func);
gst_pad_add_probe (ghost, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM,
replay_bin_event_probe, NULL, NULL);
gst_pad_add_probe (ghost, GST_PAD_PROBE_TYPE_BUFFER, replay_bin_buffer_probe,
NULL, NULL);
gst_pad_set_query_function (ghost, replay_bin_query_func);
if (!gst_element_link (src, demux))
goto fail;
g_object_set (src, "location", filename, NULL);
g_signal_connect (demux, "pad-added", G_CALLBACK (demux_pad_added_cb), ghost);
done:
return ret;
fail:
ret = NULL;
goto done;
}
/* A simple factory to set up our replay bin */
struct _OnvifFactory
{
GstRTSPOnvifMediaFactory parent;
};
G_DEFINE_TYPE (OnvifFactory, onvif_factory, GST_TYPE_RTSP_MEDIA_FACTORY);
static void
onvif_factory_init (OnvifFactory * factory)
{
}
static GstElement *
onvif_factory_create_element (GstRTSPMediaFactory * factory,
const GstRTSPUrl * url)
{
GstElement *replay_bin, *q1, *parse, *pay, *onvifts, *q2;
GstElement *ret = gst_bin_new (NULL);
GstElement *pbin = gst_bin_new ("pay0");
GstPad *sinkpad, *srcpad;
if (!(replay_bin = create_replay_bin (ret)))
goto fail;
MAKE_AND_ADD (q1, pbin, "queue", fail, NULL);
MAKE_AND_ADD (parse, pbin, "h264parse", fail, NULL);
MAKE_AND_ADD (pay, pbin, "rtph264pay", fail, NULL);
MAKE_AND_ADD (onvifts, pbin, "rtponviftimestamp", fail, NULL);
MAKE_AND_ADD (q2, pbin, "queue", fail, NULL);
gst_bin_add (GST_BIN (ret), pbin);
if (!gst_element_link_many (q1, parse, pay, onvifts, q2, NULL))
goto fail;
sinkpad = gst_element_get_static_pad (q1, "sink");
gst_element_add_pad (pbin, gst_ghost_pad_new ("sink", sinkpad));
gst_object_unref (sinkpad);
if (!gst_element_link (replay_bin, pbin))
goto fail;
srcpad = gst_element_get_static_pad (q2, "src");
gst_element_add_pad (pbin, gst_ghost_pad_new ("src", srcpad));
gst_object_unref (srcpad);
g_object_set (onvifts, "set-t-bit", TRUE, "set-e-bit", TRUE, "ntp-offset",
G_GUINT64_CONSTANT (0), "drop-out-of-segment", FALSE, NULL);
gst_element_set_clock (onvifts, gst_system_clock_obtain ());
done:
return ret;
fail:
gst_object_unref (ret);
ret = NULL;
goto done;
}
static void
onvif_factory_class_init (OnvifFactoryClass * klass)
{
GstRTSPMediaFactoryClass *mf_class = GST_RTSP_MEDIA_FACTORY_CLASS (klass);
mf_class->create_element = onvif_factory_create_element;
}
static GstRTSPMediaFactory *
onvif_factory_new (void)
{
GstRTSPMediaFactory *result;
result =
GST_RTSP_MEDIA_FACTORY (g_object_new (onvif_factory_get_type (), NULL));
return result;
}
int
main (int argc, char *argv[])
{
GMainLoop *loop;
GstRTSPServer *server;
GstRTSPMountPoints *mounts;
GstRTSPMediaFactory *factory;
GOptionContext *optctx;
GError *error = NULL;
gchar *service;
optctx = g_option_context_new ("<filename.mp4> - ONVIF RTSP Server, MP4");
g_option_context_add_group (optctx, gst_init_get_option_group ());
if (!g_option_context_parse (optctx, &argc, &argv, &error)) {
g_printerr ("Error parsing options: %s\n", error->message);
g_option_context_free (optctx);
g_clear_error (&error);
return -1;
}
if (argc < 2) {
g_print ("%s\n", g_option_context_get_help (optctx, TRUE, NULL));
return 1;
}
filename = argv[1];
g_option_context_free (optctx);
GST_DEBUG_CATEGORY_INIT (onvif_server_debug, "onvif-server", 0,
"ONVIF server");
loop = g_main_loop_new (NULL, FALSE);
server = gst_rtsp_onvif_server_new ();
mounts = gst_rtsp_server_get_mount_points (server);
factory = onvif_factory_new ();
gst_rtsp_media_factory_set_media_gtype (factory, GST_TYPE_RTSP_ONVIF_MEDIA);
gst_rtsp_mount_points_add_factory (mounts, "/test", factory);
g_object_unref (mounts);
gst_rtsp_server_attach (server, NULL);
service = gst_rtsp_server_get_service (server);
g_print ("stream ready at rtsp://127.0.0.1:%s/test\n", service);
g_free (service);
g_main_loop_run (loop);
return 0;
}