diff --git a/examples/meson.build b/examples/meson.build index 332ea666b1..2bacd48274 100644 --- a/examples/meson.build +++ b/examples/meson.build @@ -10,6 +10,8 @@ examples = [ 'test-netclock', 'test-netclock-client', 'test-ogg', + 'test-onvif-client', + 'test-onvif-server', 'test-readme', 'test-record-auth', 'test-record', diff --git a/examples/test-onvif-client.c b/examples/test-onvif-client.c new file mode 100644 index 0000000000..28fe102d50 --- /dev/null +++ b/examples/test-onvif-client.c @@ -0,0 +1,693 @@ +/* GStreamer + * Copyright (C) 2019 Mathieu Duponchelle + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#include +#include + +typedef struct +{ + gchar *range; + gdouble speed; + gchar *frames; + gchar *rate_control; + gboolean reverse; +} SeekParameters; + +typedef struct +{ + GstElement *src; + GstElement *sink; + GstElement *pipe; + SeekParameters *seek_params; + GMainLoop *loop; + GIOChannel *io; + gboolean new_range; + guint io_watch_id; + gboolean reset_sync; +} Context; + +typedef struct +{ + const gchar *name; + gboolean has_argument; + const gchar *help; + gboolean (*func) (Context * ctx, gchar * arg, gboolean * async); +} Command; + +static gboolean cmd_help (Context * ctx, gchar * arg, gboolean * async); +static gboolean cmd_pause (Context * ctx, gchar * arg, gboolean * async); +static gboolean cmd_play (Context * ctx, gchar * arg, gboolean * async); +static gboolean cmd_reverse (Context * ctx, gchar * arg, gboolean * async); +static gboolean cmd_range (Context * ctx, gchar * arg, gboolean * async); +static gboolean cmd_speed (Context * ctx, gchar * arg, gboolean * async); +static gboolean cmd_frames (Context * ctx, gchar * arg, gboolean * async); +static gboolean cmd_rate_control (Context * ctx, gchar * arg, gboolean * async); +static gboolean cmd_step_forward (Context * ctx, gchar * arg, gboolean * async); + +static Command commands[] = { + {"help", FALSE, "Display list of valid commands", cmd_help}, + {"pause", FALSE, "Pause playback", cmd_pause}, + {"play", FALSE, "Resume playback", cmd_play}, + {"reverse", FALSE, "Reverse playback direction", cmd_reverse}, + {"range", TRUE, + "Seek to the specified range, example: \"range: 19000101T000000Z-19000101T000200Z\"", + cmd_range}, + {"speed", TRUE, "Set the playback speed, example: \"speed: 1.0\"", cmd_speed}, + {"frames", TRUE, + "Set the frames trickmode, example: \"frames: intra\", \"frames: predicted\", \"frames: intra/1000\"", + cmd_frames}, + {"rate-control", TRUE, + "Set the rate control mode, example: \"rate-control: no\"", + cmd_rate_control}, + {"s", FALSE, "Step to the following frame (in current playback direction)", + cmd_step_forward}, + {NULL}, +}; + +static gchar *rtsp_address; + +#define MAKE_AND_ADD(var, pipe, name, label, elem_name) \ +G_STMT_START { \ + if (G_UNLIKELY (!(var = (gst_element_factory_make (name, elem_name))))) { \ + GST_ERROR ("Could not create element %s", name); \ + goto label; \ + } \ + if (G_UNLIKELY (!gst_bin_add (GST_BIN_CAST (pipe), var))) { \ + GST_ERROR ("Could not add element %s", name); \ + goto label; \ + } \ +} G_STMT_END + +#define DEFAULT_RANGE "19000101T000000Z-19000101T000200Z" +#define DEFAULT_SPEED 1.0 +#define DEFAULT_FRAMES "none" +#define DEFAULT_RATE_CONTROL "yes" +#define DEFAULT_REVERSE FALSE + +static void +pad_added_cb (GstElement * src, GstPad * srcpad, GstElement * peer) +{ + GstPad *sinkpad = gst_element_get_static_pad (peer, "sink"); + + gst_pad_link (srcpad, sinkpad); + + gst_object_unref (sinkpad); +} + +static gboolean +setup (Context * ctx) +{ + GstElement *onvifparse, *queue, *vdepay, *vdec, *vconv, *toverlay, *tee, + *vqueue; + gboolean ret = FALSE; + + MAKE_AND_ADD (ctx->src, ctx->pipe, "rtspsrc", done, NULL); + MAKE_AND_ADD (queue, ctx->pipe, "queue", done, NULL); + MAKE_AND_ADD (onvifparse, ctx->pipe, "rtponvifparse", done, NULL); + MAKE_AND_ADD (vdepay, ctx->pipe, "rtph264depay", done, NULL); + MAKE_AND_ADD (vdec, ctx->pipe, "avdec_h264", done, NULL); + MAKE_AND_ADD (vconv, ctx->pipe, "videoconvert", done, NULL); + MAKE_AND_ADD (toverlay, ctx->pipe, "timeoverlay", done, NULL); + MAKE_AND_ADD (tee, ctx->pipe, "tee", done, NULL); + MAKE_AND_ADD (vqueue, ctx->pipe, "queue", done, NULL); + MAKE_AND_ADD (ctx->sink, ctx->pipe, "xvimagesink", done, NULL); + + g_object_set (ctx->src, "location", rtsp_address, NULL); + g_object_set (ctx->src, "onvif-mode", TRUE, NULL); + g_object_set (ctx->src, "tcp-timeout", 0, NULL); + g_object_set (toverlay, "show-times-as-dates", TRUE, NULL); + + g_object_set (toverlay, "datetime-format", "%a %d, %b %Y - %T", NULL); + + g_signal_connect (ctx->src, "pad-added", G_CALLBACK (pad_added_cb), queue); + + if (!gst_element_link_many (queue, onvifparse, vdepay, vdec, vconv, toverlay, + tee, vqueue, ctx->sink, NULL)) { + goto done; + } + + g_object_set (ctx->src, "onvif-rate-control", FALSE, "is-live", FALSE, NULL); + + if (!g_strcmp0 (ctx->seek_params->rate_control, "no")) { + g_object_set (ctx->sink, "sync", FALSE, NULL); + } + + ret = TRUE; + +done: + return ret; +} + +static GstEvent * +translate_seek_parameters (Context * ctx, SeekParameters * seek_params) +{ + GstEvent *ret = NULL; + gchar *range_str = NULL; + GstRTSPTimeRange *rtsp_range; + GstSeekType start_type, stop_type; + GstClockTime start, stop; + gdouble rate; + GstSeekFlags flags; + gchar **split = NULL; + GstClockTime trickmode_interval = 0; + gint64 cur_pos; + + range_str = g_strdup_printf ("clock=%s", seek_params->range); + + if (gst_rtsp_range_parse (range_str, &rtsp_range) != GST_RTSP_OK) { + GST_ERROR ("Failed to parse range %s", range_str); + goto done; + } + + gst_rtsp_range_get_times (rtsp_range, &start, &stop); + + if (start > stop) { + GST_ERROR ("Invalid range, start > stop: %s", seek_params->range); + goto done; + } + + start_type = GST_SEEK_TYPE_SET; + stop_type = GST_SEEK_TYPE_SET; + + if (!ctx->new_range) { + gst_element_query_position (ctx->pipe, GST_FORMAT_TIME, &cur_pos); + if (seek_params->reverse) { + stop_type = GST_SEEK_TYPE_SET; + stop = cur_pos; + } else { + start_type = GST_SEEK_TYPE_SET; + start = cur_pos; + } + } + + ctx->new_range = FALSE; + + flags = GST_SEEK_FLAG_FLUSH; + + split = g_strsplit (seek_params->frames, "/", 2); + + if (!g_strcmp0 (split[0], "intra")) { + if (split[1]) { + guint64 interval; + gchar *end; + + interval = g_ascii_strtoull (split[1], &end, 10); + + if (!end || *end != '\0') { + GST_ERROR ("Unexpected interval value %s", split[1]); + goto done; + } + + trickmode_interval = interval * GST_MSECOND; + } + flags |= GST_SEEK_FLAG_TRICKMODE_KEY_UNITS; + } else if (!g_strcmp0 (split[0], "predicted")) { + if (split[1]) { + GST_ERROR ("Predicted frames mode does not allow an interval (%s)", + seek_params->frames); + goto done; + } + flags |= GST_SEEK_FLAG_TRICKMODE_FORWARD_PREDICTED; + } else if (g_strcmp0 (split[0], "none")) { + GST_ERROR ("Invalid frames mode (%s)", seek_params->frames); + goto done; + } + + if (seek_params->reverse) { + rate = -1.0 * seek_params->speed; + } else { + rate = 1.0 * seek_params->speed; + } + + ret = gst_event_new_seek (rate, GST_FORMAT_TIME, flags, + start_type, start, stop_type, stop); + + if (trickmode_interval) + gst_event_set_seek_trickmode_interval (ret, trickmode_interval); + +done: + if (split) + g_strfreev (split); + g_free (range_str); + return ret; +} + +static void prompt_on (Context * ctx); +static void prompt_off (Context * ctx); + +static gboolean +cmd_help (Context * ctx, gchar * arg, gboolean * async) +{ + gboolean ret = TRUE; + guint i; + + *async = FALSE; + + for (i = 0; commands[i].name; i++) { + g_print ("%s: %s\n", commands[i].name, commands[i].help); + } + + return ret; +} + +static gboolean +cmd_pause (Context * ctx, gchar * arg, gboolean * async) +{ + gboolean ret; + GstStateChangeReturn state_ret; + + g_print ("Pausing\n"); + + state_ret = gst_element_set_state (ctx->pipe, GST_STATE_PAUSED); + + *async = state_ret == GST_STATE_CHANGE_ASYNC; + ret = state_ret != GST_STATE_CHANGE_FAILURE; + + return ret; +} + +static gboolean +cmd_play (Context * ctx, gchar * arg, gboolean * async) +{ + gboolean ret; + GstStateChangeReturn state_ret; + + g_print ("Playing\n"); + + state_ret = gst_element_set_state (ctx->pipe, GST_STATE_PLAYING); + + *async = state_ret == GST_STATE_CHANGE_ASYNC; + ret = state_ret != GST_STATE_CHANGE_FAILURE; + + return ret; +} + +static gboolean +do_seek (Context * ctx) +{ + gboolean ret = FALSE; + GstEvent *event; + + if (!(event = translate_seek_parameters (ctx, ctx->seek_params))) { + GST_ERROR ("Failed to create seek event"); + goto done; + } + + if (ctx->seek_params->reverse) + g_object_set (ctx->src, "onvif-rate-control", FALSE, NULL); + + if (ctx->reset_sync) { + g_object_set (ctx->sink, "sync", TRUE, NULL); + ctx->reset_sync = FALSE; + } + + if (!gst_element_send_event (ctx->src, event)) { + GST_ERROR ("Failed to seek rtspsrc"); + g_main_loop_quit (ctx->loop); + goto done; + } + + ret = TRUE; + +done: + return ret; +} + +static gboolean +cmd_reverse (Context * ctx, gchar * arg, gboolean * async) +{ + gboolean ret = TRUE; + + g_print ("Reversing playback direction\n"); + + ctx->seek_params->reverse = !ctx->seek_params->reverse; + + ret = do_seek (ctx); + + *async = ret == TRUE; + + return ret; +} + +static gboolean +cmd_range (Context * ctx, gchar * arg, gboolean * async) +{ + gboolean ret = TRUE; + + g_print ("Switching to new range\n"); + + g_free (ctx->seek_params->range); + ctx->seek_params->range = g_strdup (arg); + ctx->new_range = TRUE; + + ret = do_seek (ctx); + + *async = ret == TRUE; + + return ret; +} + +static gboolean +cmd_speed (Context * ctx, gchar * arg, gboolean * async) +{ + gboolean ret = FALSE; + gchar *endptr = NULL; + gdouble new_speed; + + new_speed = g_ascii_strtod (arg, &endptr); + + g_print ("Switching gears\n"); + + if (endptr == NULL || *endptr != '\0' || new_speed <= 0.0) { + GST_ERROR ("Invalid value for speed: %s", arg); + goto done; + } + + ctx->seek_params->speed = new_speed; + ret = do_seek (ctx); + +done: + *async = ret == TRUE; + return ret; +} + +static gboolean +cmd_frames (Context * ctx, gchar * arg, gboolean * async) +{ + gboolean ret = TRUE; + + g_print ("Changing Frames trickmode\n"); + + g_free (ctx->seek_params->frames); + ctx->seek_params->frames = g_strdup (arg); + ret = do_seek (ctx); + *async = ret == TRUE; + + return ret; +} + +static gboolean +cmd_rate_control (Context * ctx, gchar * arg, gboolean * async) +{ + gboolean ret = FALSE; + + *async = FALSE; + + if (!g_strcmp0 (arg, "no")) { + g_object_set (ctx->sink, "sync", FALSE, NULL); + ret = TRUE; + } else if (!g_strcmp0 (arg, "yes")) { + /* TODO: there probably is a solution that doesn't involve sending + * a request to the server to reset our position */ + ctx->reset_sync = TRUE; + ret = do_seek (ctx); + *async = TRUE; + } else { + GST_ERROR ("Invalid rate-control: %s", arg); + goto done; + } + + ret = TRUE; + +done: + return ret; +} + +static gboolean +cmd_step_forward (Context * ctx, gchar * arg, gboolean * async) +{ + gboolean ret = FALSE; + GstEvent *event; + + event = gst_event_new_step (GST_FORMAT_BUFFERS, 1, 1.0, TRUE, FALSE); + + g_print ("Stepping\n"); + + if (!gst_element_send_event (ctx->sink, event)) { + GST_ERROR ("Failed to step forward"); + goto done; + } + + ret = TRUE; + +done: + *async = ret == TRUE; + return ret; +} + +static void +handle_command (Context * ctx, gchar * cmd) +{ + gchar **split; + guint i; + gboolean valid_command = FALSE; + + split = g_strsplit (cmd, ":", 0); + + cmd = g_strstrip (split[0]); + + if (cmd == NULL || *cmd == '\0') { + g_print ("> "); + goto done; + } + + for (i = 0; commands[i].name; i++) { + if (!g_strcmp0 (commands[i].name, cmd)) { + valid_command = TRUE; + if (commands[i].has_argument && g_strv_length (split) != 2) { + g_print ("Command %s expects exactly one argument:\n%s: %s\n", cmd, + commands[i].name, commands[i].help); + } else if (!commands[i].has_argument && g_strv_length (split) != 1) { + g_print ("Command %s expects no argument:\n%s: %s\n", cmd, + commands[i].name, commands[i].help); + } else { + gboolean async = FALSE; + + if (commands[i].func (ctx, + commands[i].has_argument ? g_strstrip (split[1]) : NULL, &async) + && async) + prompt_off (ctx); + else + g_print ("> "); + } + break; + } + } + + if (!valid_command) { + g_print ("Invalid command %s\n> ", cmd); + } + +done: + g_strfreev (split); +} + +static gboolean +io_callback (GIOChannel * io, GIOCondition condition, Context * ctx) +{ + gboolean ret = TRUE; + gchar *str; + GError *error = NULL; + + switch (condition) { + case G_IO_PRI: + case G_IO_IN: + switch (g_io_channel_read_line (io, &str, NULL, NULL, &error)) { + case G_IO_STATUS_ERROR: + GST_ERROR ("Failed to read commands from stdin: %s", error->message); + g_clear_error (&error); + g_main_loop_quit (ctx->loop); + break; + case G_IO_STATUS_EOF: + g_print ("EOF received, bye\n"); + g_main_loop_quit (ctx->loop); + break; + case G_IO_STATUS_AGAIN: + break; + case G_IO_STATUS_NORMAL: + handle_command (ctx, str); + g_free (str); + break; + } + break; + case G_IO_ERR: + case G_IO_HUP: + GST_ERROR ("Failed to read commands from stdin"); + g_main_loop_quit (ctx->loop); + break; + case G_IO_OUT: + default: + break; + } + + return ret; +} + +static void +prompt_on (Context * ctx) +{ + g_assert (!ctx->io); + ctx->io = g_io_channel_unix_new (STDIN_FILENO); + ctx->io_watch_id = + g_io_add_watch (ctx->io, G_IO_IN, (GIOFunc) io_callback, ctx); + g_print ("> "); +} + +static void +prompt_off (Context * ctx) +{ + g_assert (ctx->io); + g_source_remove (ctx->io_watch_id); + g_io_channel_unref (ctx->io); + ctx->io = NULL; +} + +static gboolean +bus_message_cb (GstBus * bus, GstMessage * message, Context * ctx) +{ + switch (GST_MESSAGE_TYPE (message)) { + case GST_MESSAGE_STATE_CHANGED:{ + GstState olds, news, pendings; + + if (GST_MESSAGE_SRC (message) == GST_OBJECT (ctx->pipe)) { + gst_message_parse_state_changed (message, &olds, &news, &pendings); + GST_DEBUG_BIN_TO_DOT_FILE (GST_BIN (ctx->pipe), + GST_DEBUG_GRAPH_SHOW_ALL, "playing"); + } + break; + } + case GST_MESSAGE_ERROR:{ + GError *error = NULL; + gchar *debug; + + gst_message_parse_error (message, &error, &debug); + + gst_printerr ("Error: %s (%s)\n", error->message, debug); + g_clear_error (&error); + g_free (debug); + g_main_loop_quit (ctx->loop); + break; + } + case GST_MESSAGE_LATENCY:{ + gst_bin_recalculate_latency (GST_BIN (ctx->pipe)); + break; + } + case GST_MESSAGE_ASYNC_DONE:{ + prompt_on (ctx); + } + default: + break; + } + + return TRUE; +} + +int +main (int argc, char **argv) +{ + GOptionContext *optctx; + Context ctx; + GstBus *bus; + gint ret = 1; + GError *error = NULL; + const gchar *range; + const gchar *frames; + const gchar *rate_control; + SeekParameters seek_params = + { NULL, DEFAULT_SPEED, NULL, NULL, DEFAULT_REVERSE }; + GOptionEntry entries[] = { + {"range", 0, 0, G_OPTION_ARG_STRING, &range, + "Range to seek (default: " DEFAULT_RANGE ")", "RANGE"}, + {"speed", 0, 0, G_OPTION_ARG_DOUBLE, &seek_params.speed, + "Speed to request (default: 1.0)", "SPEED"}, + {"frames", 0, 0, G_OPTION_ARG_STRING, &frames, + "Frames to request (default: none)", "FRAMES"}, + {"rate-control", 0, 0, G_OPTION_ARG_STRING, &rate_control, + "Apply rate control on the client side (default: yes)", "RATE_CONTROL"}, + {"reverse", 0, 0, G_OPTION_ARG_NONE, &seek_params.reverse, + "Playback direction", ""}, + {NULL} + }; + + optctx = g_option_context_new (" - ONVIF RTSP Client"); + g_option_context_add_main_entries (optctx, entries, NULL); + g_option_context_add_group (optctx, gst_init_get_option_group ()); + if (!g_option_context_parse (optctx, &argc, &argv, &error)) { + g_printerr ("Error parsing options: %s\n", error->message); + g_option_context_free (optctx); + g_clear_error (&error); + return -1; + } + if (argc < 2) { + g_print ("%s\n", g_option_context_get_help (optctx, TRUE, NULL)); + return 1; + } + rtsp_address = argv[1]; + g_option_context_free (optctx); + + seek_params.range = g_strdup (range ? range : DEFAULT_RANGE); + seek_params.frames = g_strdup (frames ? frames : DEFAULT_FRAMES); + seek_params.rate_control = + g_strdup (rate_control ? rate_control : DEFAULT_RATE_CONTROL); + + if (seek_params.speed <= 0.0) { + GST_ERROR ("SPEED must be a positive number"); + return 1; + } + + ctx.seek_params = &seek_params; + ctx.new_range = TRUE; + ctx.reset_sync = FALSE; + + ctx.pipe = gst_pipeline_new (NULL); + if (!setup (&ctx)) { + g_printerr ("Damn\n"); + goto done; + } + + g_print ("Type help for the list of available commands\n"); + + do_seek (&ctx); + + ctx.loop = g_main_loop_new (NULL, FALSE); + + bus = gst_pipeline_get_bus (GST_PIPELINE (ctx.pipe)); + gst_bus_add_watch (bus, (GstBusFunc) bus_message_cb, &ctx); + + /* This will make rtspsrc progress to the OPEN state, at which point we can seek it */ + if (!gst_element_set_state (ctx.pipe, GST_STATE_PLAYING)) + goto done; + + g_main_loop_run (ctx.loop); + + g_main_loop_unref (ctx.loop); + + gst_bus_remove_watch (bus); + gst_object_unref (bus); + gst_element_set_state (ctx.pipe, GST_STATE_NULL); + gst_object_unref (ctx.pipe); + + ret = 0; + +done: + g_free (seek_params.range); + g_free (seek_params.frames); + g_free (seek_params.rate_control); + return ret; +} diff --git a/examples/test-onvif-server.c b/examples/test-onvif-server.c new file mode 100644 index 0000000000..ae6ce68a5c --- /dev/null +++ b/examples/test-onvif-server.c @@ -0,0 +1,667 @@ +/* GStreamer + * Copyright (C) 2019 Mathieu Duponchelle + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + + +#include + +#include + +GST_DEBUG_CATEGORY_STATIC (onvif_server_debug); +#define GST_CAT_DEFAULT (onvif_server_debug) + +#define MAKE_AND_ADD(var, pipe, name, label, elem_name) \ +G_STMT_START { \ + if (G_UNLIKELY (!(var = (gst_element_factory_make (name, elem_name))))) { \ + GST_ERROR ("Could not create element %s", name); \ + goto label; \ + } \ + if (G_UNLIKELY (!gst_bin_add (GST_BIN_CAST (pipe), var))) { \ + GST_ERROR ("Could not add element %s", name); \ + goto label; \ + } \ +} G_STMT_END + +/* This simulates an archive of recordings running from 01-01-1900 to 01-01-2000. + * + * This is implemented by repeating the file provided at the command line, with + * an empty interval of 5 seconds in-between. We intercept relevant events to + * translate them, and update the timestamps on the output buffers. + */ + +#define INTERVAL (5 * GST_SECOND) + +/* January the first, 2000 */ +#define END_DATE 3155673600 * GST_SECOND + + +G_DECLARE_FINAL_TYPE (ReplayBin, replay_bin, REPLAY, BIN, GstBin); + +static gchar *filename; + +struct _ReplayBin +{ + GstBin parent; + + GstEvent *incoming_seek; + GstEvent *outgoing_seek; + GstClockTime trickmode_interval; + + GstSegment segment; + const GstSegment *incoming_segment; + gboolean sent_segment; + GstClockTime ts_offset; + gint64 remainder; + GstClockTime min_pts; +}; + +G_DEFINE_TYPE (ReplayBin, replay_bin, GST_TYPE_BIN); + +static void +replay_bin_init (ReplayBin * self) +{ + self->incoming_seek = NULL; + self->outgoing_seek = NULL; + self->trickmode_interval = 0; + self->ts_offset = 0; + self->sent_segment = FALSE; + self->min_pts = GST_CLOCK_TIME_NONE; +} + +static void +replay_bin_class_init (ReplayBinClass * klass) +{ +} + +static GstElement * +replay_bin_new (void) +{ + return GST_ELEMENT (g_object_new (replay_bin_get_type (), NULL)); +} + +static void +demux_pad_added_cb (GstElement * demux, GstPad * pad, GstGhostPad * ghost) +{ + GstCaps *caps = gst_pad_get_current_caps (pad); + GstStructure *s = gst_caps_get_structure (caps, 0); + + if (gst_structure_has_name (s, "video/x-h264")) { + gst_ghost_pad_set_target (ghost, pad); + } + + gst_caps_unref (caps); +} + +static void +query_seekable (GstPad * ghost, gint64 * start, gint64 * stop) +{ + GstPad *target; + GstQuery *query; + GstFormat format; + gboolean seekable; + + target = gst_ghost_pad_get_target (GST_GHOST_PAD (ghost)); + + query = gst_query_new_seeking (GST_FORMAT_TIME); + + gst_pad_query (target, query); + + gst_query_parse_seeking (query, &format, &seekable, start, stop); + + g_assert (seekable); + + gst_object_unref (target); +} + +static GstEvent * +translate_seek (ReplayBin * self, GstPad * pad, GstEvent * ievent) +{ + GstEvent *oevent = NULL; + gdouble rate; + GstFormat format; + GstSeekFlags flags; + GstSeekType start_type, stop_type; + gint64 start, stop; + gint64 istart, istop; /* Incoming */ + gint64 ustart, ustop; /* Upstream */ + gint64 ostart, ostop; /* Outgoing */ + guint32 seqnum = gst_event_get_seqnum (ievent); + + gst_event_parse_seek (ievent, &rate, &format, &flags, &start_type, &start, + &stop_type, &stop); + + if (!GST_CLOCK_TIME_IS_VALID (stop)) + stop = END_DATE; + + gst_event_parse_seek_trickmode_interval (ievent, &self->trickmode_interval); + + istart = start; + istop = stop; + + query_seekable (pad, &ustart, &ustop); + + if (rate > 0) { + /* First, from where we should seek the file */ + ostart = istart % (ustop + INTERVAL); + + /* This may end up in our empty interval */ + if (ostart > ustop) { + istart += ostart - ustop; + ostart = 0; + } + + /* Then, up to where we should seek it */ + ostop = MIN (ustop, ostart + (istop - istart)); + } else { + /* First up to where we should seek the file */ + ostop = istop % (ustop + INTERVAL); + + GST_ERROR ("Ostop is %" GST_TIME_FORMAT, GST_TIME_ARGS (ostop)); + + /* This may end up in our empty interval */ + if (ostop > ustop) { + istop -= ostop - ustop; + ostop = ustop; + } + + ostart = MAX (0, ostop - (istop - istart)); + } + + /* We may be left with nothing to actually play, in this + * case we won't seek upstream, and emit the expected events + * ourselves */ + if (istart > istop) { + GstSegment segment; + GstEvent *event; + gboolean update; + + event = gst_event_new_flush_start (); + gst_event_set_seqnum (event, seqnum); + gst_pad_push_event (pad, event); + + event = gst_event_new_flush_stop (TRUE); + gst_event_set_seqnum (event, seqnum); + gst_pad_push_event (pad, event); + + gst_segment_init (&segment, format); + gst_segment_do_seek (&segment, rate, format, flags, start_type, start, + stop_type, stop, &update); + + event = gst_event_new_segment (&segment); + gst_event_set_seqnum (event, seqnum); + gst_pad_push_event (pad, event); + + event = gst_event_new_eos (); + gst_event_set_seqnum (event, seqnum); + gst_pad_push_event (pad, event); + + goto done; + } + + /* Lastly, how much will remain to play back (this remainder includes the interval) */ + if (stop - start > ostop - ostart) + self->remainder = (stop - start) - (ostop - ostart); + + flags |= GST_SEEK_FLAG_SEGMENT; + + oevent = + gst_event_new_seek (rate, format, flags, start_type, ostart, stop_type, + ostop); + gst_event_set_seek_trickmode_interval (oevent, self->trickmode_interval); + gst_event_set_seqnum (oevent, seqnum); + + GST_ERROR ("Translated event to %" GST_PTR_FORMAT " (remainder: %ld)", oevent, + self->remainder); + +done: + return oevent; +} + +static gboolean +replay_bin_event_func (GstPad * pad, GstObject * parent, GstEvent * event) +{ + ReplayBin *self = REPLAY_BIN (parent); + gboolean ret = TRUE; + gboolean forward = TRUE; + + switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_SEEK: + { + GST_ERROR ("Processing seek event %" GST_PTR_FORMAT, event); + + self->incoming_seek = event; + + gst_event_replace (&self->outgoing_seek, NULL); + self->sent_segment = FALSE; + + event = translate_seek (self, pad, event); + + if (!event) + forward = FALSE; + else + self->outgoing_seek = gst_event_ref (event); + break; + } + default: + break; + } + + if (forward) + return gst_pad_event_default (pad, parent, event); + else + return ret; +} + +static gboolean +replay_bin_query_func (GstPad * pad, GstObject * parent, GstQuery * query) +{ + ReplayBin *self = REPLAY_BIN (parent); + gboolean ret = TRUE; + gboolean forward = TRUE; + + switch (GST_QUERY_TYPE (query)) { + case GST_QUERY_SEEKING: + /* We are seekable from the beginning till the end of time */ + gst_query_set_seeking (query, GST_FORMAT_TIME, TRUE, 0, + GST_CLOCK_TIME_NONE); + forward = FALSE; + break; + case GST_QUERY_SEGMENT: + gst_query_set_segment (query, self->segment.rate, self->segment.format, + self->segment.start, self->segment.stop); + forward = FALSE; + default: + break; + } + + GST_DEBUG ("Processed query %" GST_PTR_FORMAT, query); + + if (forward) + return gst_pad_query_default (pad, parent, query); + else + return ret; +} + +static GstEvent * +translate_segment (GstPad * pad, GstEvent * ievent) +{ + ReplayBin *self = REPLAY_BIN (GST_OBJECT_PARENT (pad)); + GstEvent *ret; + gdouble irate, orate; + GstFormat iformat, oformat; + GstSeekFlags iflags, oflags; + GstSeekType istart_type, ostart_type, istop_type, ostop_type; + gint64 istart, ostart, istop, ostop; + gboolean update; + + gst_event_parse_segment (ievent, &self->incoming_segment); + + if (!self->outgoing_seek) { + GstSegment segment; + gboolean update; + + gst_segment_init (&segment, GST_FORMAT_TIME); + + gst_segment_do_seek (&segment, 1.0, GST_FORMAT_TIME, 0, GST_SEEK_TYPE_SET, + 0, GST_SEEK_TYPE_SET, END_DATE, &update); + + ret = gst_event_new_segment (&segment); + gst_event_unref (ievent); + goto done; + } + + if (!self->sent_segment) { + gst_event_parse_seek (self->incoming_seek, &irate, &iformat, &iflags, + &istart_type, &istart, &istop_type, &istop); + gst_event_parse_seek (self->outgoing_seek, &orate, &oformat, &oflags, + &ostart_type, &ostart, &ostop_type, &ostop); + + if (istop == -1) + istop = END_DATE; + + if (self->incoming_segment->rate > 0) + self->ts_offset = istart - ostart; + else + self->ts_offset = istop - ostop; + + istart += self->incoming_segment->start - ostart; + istop += self->incoming_segment->stop - ostop; + + gst_segment_init (&self->segment, self->incoming_segment->format); + + gst_segment_do_seek (&self->segment, self->incoming_segment->rate, + self->incoming_segment->format, self->incoming_segment->flags, + GST_SEEK_TYPE_SET, (guint64) istart, GST_SEEK_TYPE_SET, (guint64) istop, + &update); + + self->min_pts = istart; + + ret = gst_event_new_segment (&self->segment); + + self->sent_segment = TRUE; + + gst_event_unref (ievent); + + GST_ERROR ("Translated segment: %" GST_PTR_FORMAT ", ts_offset: %lu", ret, + self->ts_offset); + } else { + ret = NULL; + } + +done: + return ret; +} + +static void +handle_segment_done (ReplayBin * self, GstPad * pad) +{ + GstEvent *event; + + if (self->remainder < INTERVAL) { + self->remainder = 0; + event = gst_event_new_eos (); + gst_event_set_seqnum (event, gst_event_get_seqnum (self->incoming_seek)); + gst_pad_push_event (pad, event); + } else { + gint64 ustart, ustop; + gint64 ostart, ostop; + GstPad *target; + GstStructure *s; + + /* Signify the end of a contiguous section of recording */ + s = gst_structure_new ("GstNtpOffset", + "ntp-offset", G_TYPE_UINT64, 0, "discont", G_TYPE_BOOLEAN, TRUE, NULL); + + event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM, s); + + gst_pad_push_event (pad, event); + + query_seekable (pad, &ustart, &ustop); + + self->remainder -= INTERVAL; + + if (self->incoming_segment->rate > 0) { + ostart = 0; + ostop = MIN (ustop, self->remainder); + } else { + ostart = MAX (ustop - self->remainder, 0); + ostop = ustop; + } + + self->remainder = MAX (self->remainder - ostop - ostart, 0); + + event = + gst_event_new_seek (self->segment.rate, self->segment.format, + self->segment.flags & ~GST_SEEK_FLAG_FLUSH, GST_SEEK_TYPE_SET, ostart, + GST_SEEK_TYPE_SET, ostop); + gst_event_set_seek_trickmode_interval (event, self->trickmode_interval); + + if (self->incoming_segment->rate > 0) + self->ts_offset += INTERVAL + ustop; + else + self->ts_offset -= INTERVAL + ustop; + + GST_DEBUG ("New offset: %ld", self->ts_offset); + + GST_DEBUG ("Seeking to %" GST_PTR_FORMAT, event); + target = gst_ghost_pad_get_target (GST_GHOST_PAD (pad)); + gst_pad_send_event (target, event); + gst_object_unref (target); + } +} + +static GstPadProbeReturn +replay_bin_event_probe (GstPad * pad, GstPadProbeInfo * info, gpointer unused) +{ + ReplayBin *self = REPLAY_BIN (GST_OBJECT_PARENT (pad)); + GstPadProbeReturn ret = GST_PAD_PROBE_OK; + + GST_DEBUG ("Probed %" GST_PTR_FORMAT, info->data); + + switch (GST_EVENT_TYPE (info->data)) { + case GST_EVENT_SEGMENT: + { + GstEvent *translated; + + GST_DEBUG ("Probed segment %" GST_PTR_FORMAT, info->data); + + translated = translate_segment (pad, GST_EVENT (info->data)); + if (translated) + info->data = translated; + else + ret = GST_PAD_PROBE_HANDLED; + + break; + } + case GST_EVENT_SEGMENT_DONE: + { + handle_segment_done (self, pad); + ret = GST_PAD_PROBE_HANDLED; + break; + } + default: + break; + } + + return ret; +} + +static GstPadProbeReturn +replay_bin_buffer_probe (GstPad * pad, GstPadProbeInfo * info, gpointer unused) +{ + ReplayBin *self = REPLAY_BIN (GST_OBJECT_PARENT (pad)); + GstPadProbeReturn ret = GST_PAD_PROBE_OK; + + if (GST_BUFFER_PTS (info->data) > self->incoming_segment->stop) { + ret = GST_PAD_PROBE_DROP; + goto done; + } + + if (GST_CLOCK_TIME_IS_VALID (GST_BUFFER_PTS (info->data))) + GST_BUFFER_PTS (info->data) += self->ts_offset; + if (GST_CLOCK_TIME_IS_VALID (GST_BUFFER_DTS (info->data))) + GST_BUFFER_DTS (info->data) += self->ts_offset; + + /* + if (GST_BUFFER_PTS (info->data) < self->min_pts) { + GST_ERROR ("Retimestamping for the greater good"); + GST_BUFFER_PTS (info->data) = self->min_pts; + self->min_pts += 1; + } + */ + + GST_LOG ("Pushing buffer %" GST_PTR_FORMAT, info->data); + +done: + return ret; +} + +static GstElement * +create_replay_bin (GstElement * parent) +{ + GstElement *ret, *src, *demux; + GstPad *ghost; + + ret = replay_bin_new (); + if (!gst_bin_add (GST_BIN (parent), ret)) { + gst_object_unref (ret); + goto fail; + } + + MAKE_AND_ADD (src, ret, "filesrc", fail, NULL); + MAKE_AND_ADD (demux, ret, "qtdemux", fail, NULL); + + ghost = gst_ghost_pad_new_no_target ("src", GST_PAD_SRC); + gst_element_add_pad (ret, ghost); + + gst_pad_set_event_function (ghost, replay_bin_event_func); + gst_pad_add_probe (ghost, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM, + replay_bin_event_probe, NULL, NULL); + gst_pad_add_probe (ghost, GST_PAD_PROBE_TYPE_BUFFER, replay_bin_buffer_probe, + NULL, NULL); + gst_pad_set_query_function (ghost, replay_bin_query_func); + + if (!gst_element_link (src, demux)) + goto fail; + + g_object_set (src, "location", filename, NULL); + g_signal_connect (demux, "pad-added", G_CALLBACK (demux_pad_added_cb), ghost); + +done: + return ret; + +fail: + ret = NULL; + goto done; +} + +/* A simple factory to set up our replay bin */ + +G_DECLARE_FINAL_TYPE (OnvifFactory, onvif_factory, ONVIF, FACTORY, + GstRTSPMediaFactory); + +struct _OnvifFactory +{ + GstRTSPMediaFactory parent; +}; + +G_DEFINE_TYPE (OnvifFactory, onvif_factory, GST_TYPE_RTSP_MEDIA_FACTORY); + +static void +onvif_factory_init (OnvifFactory * factory) +{ +} + +static GstElement * +onvif_factory_create_element (GstRTSPMediaFactory * factory, + const GstRTSPUrl * url) +{ + GstElement *replay_bin, *q1, *parse, *pay, *onvifts, *q2; + GstElement *ret = gst_bin_new (NULL); + GstElement *pbin = gst_bin_new ("pay0"); + GstPad *sinkpad, *srcpad; + + if (!(replay_bin = create_replay_bin (ret))) + goto fail; + + MAKE_AND_ADD (q1, pbin, "queue", fail, NULL); + MAKE_AND_ADD (parse, pbin, "h264parse", fail, NULL); + MAKE_AND_ADD (pay, pbin, "rtph264pay", fail, NULL); + MAKE_AND_ADD (onvifts, pbin, "rtponviftimestamp", fail, NULL); + MAKE_AND_ADD (q2, pbin, "queue", fail, NULL); + + gst_bin_add (GST_BIN (ret), pbin); + + if (!gst_element_link_many (q1, parse, pay, onvifts, q2, NULL)) + goto fail; + + sinkpad = gst_element_get_static_pad (q1, "sink"); + gst_element_add_pad (pbin, gst_ghost_pad_new ("sink", sinkpad)); + gst_object_unref (sinkpad); + + if (!gst_element_link (replay_bin, pbin)) + goto fail; + + srcpad = gst_element_get_static_pad (q2, "src"); + gst_element_add_pad (pbin, gst_ghost_pad_new ("src", srcpad)); + gst_object_unref (srcpad); + + g_object_set (onvifts, "set-t-bit", TRUE, "set-e-bit", TRUE, "ntp-offset", 0, + "drop-out-of-segment", FALSE, NULL); + + gst_element_set_clock (onvifts, gst_system_clock_obtain ()); + +done: + return ret; + +fail: + gst_object_unref (ret); + ret = NULL; + goto done; +} + +static void +onvif_factory_class_init (OnvifFactoryClass * klass) +{ + GstRTSPMediaFactoryClass *mf_class = GST_RTSP_MEDIA_FACTORY_CLASS (klass); + + mf_class->create_element = onvif_factory_create_element; +} + +static GstRTSPMediaFactory * +onvif_factory_new (void) +{ + GstRTSPMediaFactory *result; + + result = + GST_RTSP_MEDIA_FACTORY (g_object_new (onvif_factory_get_type (), NULL)); + + return result; +} + +int +main (int argc, char *argv[]) +{ + GMainLoop *loop; + GstRTSPServer *server; + GstRTSPMountPoints *mounts; + GstRTSPMediaFactory *factory; + GOptionContext *optctx; + GError *error = NULL; + gchar *service; + + optctx = g_option_context_new (" - ONVIF RTSP Server, MP4"); + g_option_context_add_group (optctx, gst_init_get_option_group ()); + if (!g_option_context_parse (optctx, &argc, &argv, &error)) { + g_printerr ("Error parsing options: %s\n", error->message); + g_option_context_free (optctx); + g_clear_error (&error); + return -1; + } + if (argc < 2) { + g_print ("%s\n", g_option_context_get_help (optctx, TRUE, NULL)); + return 1; + } + filename = argv[1]; + g_option_context_free (optctx); + + GST_DEBUG_CATEGORY_INIT (onvif_server_debug, "onvif-server", 0, + "ONVIF server"); + + loop = g_main_loop_new (NULL, FALSE); + + server = gst_rtsp_onvif_server_new (); + + mounts = gst_rtsp_server_get_mount_points (server); + + factory = onvif_factory_new (); + gst_rtsp_media_factory_set_media_gtype (factory, GST_TYPE_RTSP_ONVIF_MEDIA); + + gst_rtsp_mount_points_add_factory (mounts, "/test", factory); + + g_object_unref (mounts); + + gst_rtsp_server_attach (server, NULL); + + service = gst_rtsp_server_get_service (server); + g_print ("stream ready at rtsp://127.0.0.1:%s/test\n", service); + g_free (service); + g_main_loop_run (loop); + + return 0; +}