From e97877dc7e107f0ae514325a86c0cbac77653e9b Mon Sep 17 00:00:00 2001 From: George Kiagiadakis Date: Wed, 5 Jul 2017 16:56:24 +0300 Subject: [PATCH] tests/examples: add manual tests/examples for the ipcpipeline elements ipcpipeline1 is a very simple test that shows a short videotestsrc fragment. ipc-play is a clone of gst-play that splits the pipeline in two processes, running the source & demuxer on the master process and the decoders & sinks on the slave. --- .gitignore | 2 + configure.ac | 1 + tests/examples/Makefile.am | 2 +- tests/examples/ipcpipeline/Makefile.am | 11 + tests/examples/ipcpipeline/ipc-play.c | 989 ++++++++++++++++++++++ tests/examples/ipcpipeline/ipcpipeline1.c | 183 ++++ 6 files changed, 1187 insertions(+), 1 deletion(-) create mode 100644 tests/examples/ipcpipeline/Makefile.am create mode 100644 tests/examples/ipcpipeline/ipc-play.c create mode 100644 tests/examples/ipcpipeline/ipcpipeline1.c diff --git a/.gitignore b/.gitignore index 297267d68b..5a64122c80 100644 --- a/.gitignore +++ b/.gitignore @@ -55,6 +55,8 @@ gst*orc.h /tests/check/pipelines/simple-launch-lines /tests/check/pipelines/ipcpipeline /tests/examples/audiomixmatrix/test-audiomixmatrix +/tests/examples/ipcpipeline/ipc-play +/tests/examples/ipcpipeline/ipcpipeline1 /tests/examples/codecparsers/parse-jpeg /tests/examples/codecparsers/parse-vp8 /tests/examples/gtk/gtkglsink diff --git a/configure.ac b/configure.ac index 42d54c9d88..e06932f689 100644 --- a/configure.ac +++ b/configure.ac @@ -3702,6 +3702,7 @@ tests/examples/gl/gtk/switchvideooverlay/Makefile tests/examples/gl/qt/Makefile tests/examples/gl/sdl/Makefile tests/examples/gtk/Makefile +tests/examples/ipcpipeline/Makefile tests/examples/mpegts/Makefile tests/examples/mxf/Makefile tests/examples/opencv/Makefile diff --git a/tests/examples/Makefile.am b/tests/examples/Makefile.am index c9b5e0b576..3649a500ba 100644 --- a/tests/examples/Makefile.am +++ b/tests/examples/Makefile.am @@ -61,6 +61,6 @@ playout_LDADD = $(GST_PLUGINS_BASE_LIBS) -lgstvideo-$(GST_API_VERSION) $(GST_LIB SUBDIRS= codecparsers mpegts $(DIRECTFB_DIR) $(GTK_EXAMPLES) $(OPENCV_EXAMPLES) \ $(GL_DIR) $(GTK3_DIR) $(AVSAMPLE_DIR) $(WAYLAND_DIR) $(MATRIXMIX_DIR) DIST_SUBDIRS= codecparsers mpegts camerabin2 directfb mxf opencv uvch264 gl gtk \ - avsamplesink waylandsink audiomixmatrix + avsamplesink waylandsink audiomixmatrix ipcpipeline include $(top_srcdir)/common/parallel-subdirs.mak diff --git a/tests/examples/ipcpipeline/Makefile.am b/tests/examples/ipcpipeline/Makefile.am new file mode 100644 index 0000000000..c759aa4a65 --- /dev/null +++ b/tests/examples/ipcpipeline/Makefile.am @@ -0,0 +1,11 @@ +noinst_PROGRAMS = ipcpipeline1 \ + ipc-play + +ipcpipeline1_SOURCES = ipcpipeline1.c +ipcpipeline1_CFLAGS = $(GST_CFLAGS) $(GST_BASE_CFLAGS) $(GST_PLUGINS_BASE_CFLAGS) +ipcpipeline1_LDFLAGS = $(GST_LIBS) $(GST_BASE_LIBS) $(GST_PLUGINS_BASE_LIBS) $(GSTPB_BASE_LIBS) + +ipc_play_SOURCES = ipc-play.c +ipc_play_CFLAGS = $(GST_CFLAGS) $(GST_BASE_CFLAGS) $(GST_PLUGINS_BASE_CFLAGS) +ipc_play_LDFLAGS = $(GST_LIBS) $(GST_BASE_LIBS) $(GST_PLUGINS_BASE_LIBS) $(GSTPB_BASE_LIBS) \ + -lgstvideo-$(GST_API_VERSION) diff --git a/tests/examples/ipcpipeline/ipc-play.c b/tests/examples/ipcpipeline/ipc-play.c new file mode 100644 index 0000000000..e41c07a8ed --- /dev/null +++ b/tests/examples/ipcpipeline/ipc-play.c @@ -0,0 +1,989 @@ +/* GStreamer + * + * example program for the ipcpipelinesrc/ipcpipelinesink elements + * + * Copyright (C) 2013-2014 Tim-Philipp Müller + * Copyright (C) 2013 Collabora Ltd. + * Copyright (C) 2015 Centricular Ltd + * Copyright (C) 2015-2017 YouView TV Ltd + * Author: George Kiagiadakis + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +/* + * Based on gst-play and ipcpipeline1. This program will play any URI + * while splitting the pipeline in two processes, running the source & demuxer + * on the master process and the decoders & sinks on the slave. + * See keyboard_cb() for the various keyboard shortcuts you can use to + * interract with it while the video window is focused. + */ + +#define _GNU_SOURCE +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +static GMainLoop *loop; +static int pipes[2] = { -1, -1 }; + +static const char *arg_video_sink = "autovideosink"; +static const char *arg_audio_sink = "autoaudiosink"; + +/******* MASTER *******/ + +#define GST_PLAY_KB_ARROW_UP "\033[A" +#define GST_PLAY_KB_ARROW_DOWN "\033[B" +#define GST_PLAY_KB_ARROW_RIGHT "\033[C" +#define GST_PLAY_KB_ARROW_LEFT "\033[D" + +typedef enum +{ + GST_PLAY_TRICK_MODE_NONE = 0, + GST_PLAY_TRICK_MODE_DEFAULT, + GST_PLAY_TRICK_MODE_DEFAULT_NO_AUDIO, + GST_PLAY_TRICK_MODE_KEY_UNITS, + GST_PLAY_TRICK_MODE_KEY_UNITS_NO_AUDIO, + GST_PLAY_TRICK_MODE_LAST +} GstPlayTrickMode; + +static GstPlayTrickMode trick_mode = GST_PLAY_TRICK_MODE_NONE; +static gdouble cur_rate = 1.0; +static gboolean buffering = FALSE; +static GstState desired_state = GST_STATE_PLAYING; + +static gboolean play_do_seek (GstElement * pipeline, gint64 pos, gdouble rate, + GstPlayTrickMode mode); + +static void +toggle_paused (GstElement * pipeline) +{ + if (desired_state == GST_STATE_PLAYING) + desired_state = GST_STATE_PAUSED; + else + desired_state = GST_STATE_PLAYING; + + if (!buffering) { + gst_element_set_state (pipeline, desired_state); + } else if (desired_state == GST_STATE_PLAYING) { + g_print ("\nWill play as soon as buffering finishes)\n"); + } +} + +static void +relative_seek (GstElement * pipeline, gdouble percent) +{ + GstQuery *query; + gboolean seekable = FALSE; + gint64 dur = -1, pos = -1, step; + + g_return_if_fail (percent >= -1.0 && percent <= 1.0); + + if (!gst_element_query_position (pipeline, GST_FORMAT_TIME, &pos)) + goto seek_failed; + + query = gst_query_new_seeking (GST_FORMAT_TIME); + if (!gst_element_query (pipeline, query)) { + gst_query_unref (query); + goto seek_failed; + } + + gst_query_parse_seeking (query, NULL, &seekable, NULL, &dur); + gst_query_unref (query); + + if (!seekable || dur <= 0) + goto seek_failed; + + step = dur * percent; + if (ABS (step) < GST_SECOND) + step = (percent < 0) ? -GST_SECOND : GST_SECOND; + + pos = pos + step; + if (pos > dur) { + g_print ("\nReached end of play list.\n"); + g_main_loop_quit (loop); + } else { + if (pos < 0) + pos = 0; + + play_do_seek (pipeline, pos, cur_rate, trick_mode); + } + + return; + +seek_failed: + { + g_print ("\nCould not seek.\n"); + } +} + +static gboolean +play_set_rate_and_trick_mode (GstElement * pipeline, gdouble rate, + GstPlayTrickMode mode) +{ + gint64 pos = -1; + + g_return_val_if_fail (rate != 0, FALSE); + + if (!gst_element_query_position (pipeline, GST_FORMAT_TIME, &pos)) + return FALSE; + + return play_do_seek (pipeline, pos, rate, mode); +} + +static gboolean +play_do_seek (GstElement * pipeline, gint64 pos, gdouble rate, + GstPlayTrickMode mode) +{ + GstSeekFlags seek_flags; + GstQuery *query; + GstEvent *seek; + gboolean seekable = FALSE; + + query = gst_query_new_seeking (GST_FORMAT_TIME); + if (!gst_element_query (pipeline, query)) { + gst_query_unref (query); + return FALSE; + } + + gst_query_parse_seeking (query, NULL, &seekable, NULL, NULL); + gst_query_unref (query); + + if (!seekable) + return FALSE; + + seek_flags = GST_SEEK_FLAG_FLUSH; + + switch (mode) { + case GST_PLAY_TRICK_MODE_DEFAULT: + seek_flags |= GST_SEEK_FLAG_TRICKMODE; + break; + case GST_PLAY_TRICK_MODE_DEFAULT_NO_AUDIO: + seek_flags |= GST_SEEK_FLAG_TRICKMODE | GST_SEEK_FLAG_TRICKMODE_NO_AUDIO; + break; + case GST_PLAY_TRICK_MODE_KEY_UNITS: + seek_flags |= GST_SEEK_FLAG_TRICKMODE_KEY_UNITS; + break; + case GST_PLAY_TRICK_MODE_KEY_UNITS_NO_AUDIO: + seek_flags |= + GST_SEEK_FLAG_TRICKMODE_KEY_UNITS | GST_SEEK_FLAG_TRICKMODE_NO_AUDIO; + break; + case GST_PLAY_TRICK_MODE_NONE: + default: + break; + } + + if (rate >= 0) + seek = gst_event_new_seek (rate, GST_FORMAT_TIME, + seek_flags | GST_SEEK_FLAG_ACCURATE, + /* start */ GST_SEEK_TYPE_SET, pos, + /* stop */ GST_SEEK_TYPE_SET, GST_CLOCK_TIME_NONE); + else + seek = gst_event_new_seek (rate, GST_FORMAT_TIME, + seek_flags | GST_SEEK_FLAG_ACCURATE, + /* start */ GST_SEEK_TYPE_SET, 0, + /* stop */ GST_SEEK_TYPE_SET, pos); + + if (!gst_element_send_event (pipeline, seek)) + return FALSE; + + cur_rate = rate; + trick_mode = mode; + return TRUE; +} + +static void +play_set_playback_rate (GstElement * pipeline, gdouble rate) +{ + if (play_set_rate_and_trick_mode (pipeline, rate, trick_mode)) { + g_print ("Playback rate: %.2f", rate); + g_print (" \n"); + } else { + g_print ("\n"); + g_print ("Could not change playback rate to %.2f", rate); + g_print (".\n"); + } +} + +static void +play_set_relative_playback_rate (GstElement * pipeline, gdouble rate_step, + gboolean reverse_direction) +{ + gdouble new_rate = cur_rate + rate_step; + + if (reverse_direction) + new_rate *= -1.0; + + play_set_playback_rate (pipeline, new_rate); +} + +static const gchar * +trick_mode_get_description (GstPlayTrickMode mode) +{ + switch (mode) { + case GST_PLAY_TRICK_MODE_NONE: + return "normal playback, trick modes disabled"; + case GST_PLAY_TRICK_MODE_DEFAULT: + return "trick mode: default"; + case GST_PLAY_TRICK_MODE_DEFAULT_NO_AUDIO: + return "trick mode: default, no audio"; + case GST_PLAY_TRICK_MODE_KEY_UNITS: + return "trick mode: key frames only"; + case GST_PLAY_TRICK_MODE_KEY_UNITS_NO_AUDIO: + return "trick mode: key frames only, no audio"; + default: + break; + } + return "unknown trick mode"; +} + +static void +play_switch_trick_mode (GstElement * pipeline) +{ + GstPlayTrickMode new_mode = ++trick_mode; + const gchar *mode_desc; + + if (new_mode == GST_PLAY_TRICK_MODE_LAST) + new_mode = GST_PLAY_TRICK_MODE_NONE; + + mode_desc = trick_mode_get_description (new_mode); + + if (play_set_rate_and_trick_mode (pipeline, cur_rate, new_mode)) { + g_print ("Rate: %.2f (%s) \n", cur_rate, mode_desc); + } else { + g_print ("\nCould not change trick mode to %s.\n", mode_desc); + } +} + +static void +keyboard_cb (const gchar * key_input, GstElement * pipeline) +{ + gchar key = '\0'; + + /* only want to switch/case on single char, not first char of string */ + if (key_input[0] != '\0' && key_input[1] == '\0') + key = g_ascii_tolower (key_input[0]); + + switch (key) { + case ' ': + toggle_paused (pipeline); + break; + case 'q': + case 'Q': + g_main_loop_quit (loop); + break; + case 'p': + if (cur_rate > -0.2 && cur_rate < 0.0) + play_set_relative_playback_rate (pipeline, 0.0, TRUE); + else if (ABS (cur_rate) < 2.0) + play_set_relative_playback_rate (pipeline, 0.1, FALSE); + else if (ABS (cur_rate) < 4.0) + play_set_relative_playback_rate (pipeline, 0.5, FALSE); + else + play_set_relative_playback_rate (pipeline, 1.0, FALSE); + break; + case 'o': + if (cur_rate > 0.0 && cur_rate < 0.20) + play_set_relative_playback_rate (pipeline, 0.0, TRUE); + else if (ABS (cur_rate) <= 2.0) + play_set_relative_playback_rate (pipeline, -0.1, FALSE); + else if (ABS (cur_rate) <= 4.0) + play_set_relative_playback_rate (pipeline, -0.5, FALSE); + else + play_set_relative_playback_rate (pipeline, -1.0, FALSE); + break; + case 'd': + play_set_relative_playback_rate (pipeline, 0.0, TRUE); + break; + case 't': + play_switch_trick_mode (pipeline); + break; + case 27: /* ESC */ + if (key_input[1] == '\0') { + g_main_loop_quit (loop); + break; + } + case '0': + play_do_seek (pipeline, 0, cur_rate, trick_mode); + break; + case 'r': + GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS (GST_BIN (pipeline), + GST_DEBUG_GRAPH_SHOW_ALL, "ipc.master.requested"); + break; + default: + if (strcmp (key_input, GST_PLAY_KB_ARROW_RIGHT) == 0) { + relative_seek (pipeline, +0.08); + } else if (strcmp (key_input, GST_PLAY_KB_ARROW_LEFT) == 0) { + relative_seek (pipeline, -0.01); + } else { + GST_INFO ("keyboard input:"); + for (; *key_input != '\0'; ++key_input) + GST_INFO (" code %3d", *key_input); + } + break; + } +} + +static gboolean +master_bus_msg (GstBus * bus, GstMessage * msg, gpointer data) +{ + GstPipeline *pipeline = data; + + switch (GST_MESSAGE_TYPE (msg)) { + case GST_MESSAGE_ERROR:{ + GError *err; + gchar *dbg; + + gst_message_parse_error (msg, &err, &dbg); + g_printerr ("MASTER: ERROR: %s\n", err->message); + if (dbg != NULL) + g_printerr ("MASTER: ERROR debug information: %s\n", dbg); + g_error_free (err); + g_free (dbg); + + GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS (GST_BIN (pipeline), + GST_DEBUG_GRAPH_SHOW_ALL, "ipc.master.error"); + + g_main_loop_quit (loop); + break; + } + case GST_MESSAGE_WARNING:{ + GError *err; + gchar *dbg; + + gst_message_parse_warning (msg, &err, &dbg); + g_printerr ("MASTER: WARNING: %s\n", err->message); + if (dbg != NULL) + g_printerr ("MASTER: WARNING debug information: %s\n", dbg); + g_error_free (err); + g_free (dbg); + + GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS (GST_BIN (pipeline), + GST_DEBUG_GRAPH_SHOW_ALL, "ipc.master.warning"); + break; + } + case GST_MESSAGE_ASYNC_DONE: + GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS (GST_BIN (pipeline), + GST_DEBUG_GRAPH_SHOW_ALL, "ipc.master.async-done"); + break; + case GST_MESSAGE_EOS: + g_print ("EOS on master\n"); + gst_element_set_state (GST_ELEMENT (pipeline), GST_STATE_NULL); + g_main_loop_quit (loop); + break; + case GST_MESSAGE_BUFFERING:{ + gint percent; + GstBufferingMode bufmode; + + if (!buffering) + g_print ("\n"); + + gst_message_parse_buffering (msg, &percent); + g_print ("%s %d%% \r", "Buffering...", percent); + + gst_message_parse_buffering_stats (msg, &bufmode, NULL, NULL, NULL); + + /* no state management needed for live pipelines */ + if (bufmode != GST_BUFFERING_LIVE) { + if (percent == 100) { + /* a 100% message means buffering is done */ + if (buffering) { + buffering = FALSE; + gst_element_set_state (GST_ELEMENT (pipeline), desired_state); + g_print ("\n%s\n", gst_element_state_get_name (desired_state)); + } + } else { + /* buffering... */ + if (!buffering) { + gst_element_set_state (GST_ELEMENT (pipeline), GST_STATE_PAUSED); + buffering = TRUE; + } + } + } + break; + } + case GST_MESSAGE_CLOCK_LOST:{ + g_print ("Clock lost, selecting a new one\n"); + gst_element_set_state (GST_ELEMENT (pipeline), GST_STATE_PAUSED); + gst_element_set_state (GST_ELEMENT (pipeline), GST_STATE_PLAYING); + break; + } + case GST_MESSAGE_LATENCY: + { + gst_bin_recalculate_latency (GST_BIN (pipeline)); + break; + } + case GST_MESSAGE_REQUEST_STATE:{ + GstState state; + gchar *name; + + name = gst_object_get_path_string (GST_MESSAGE_SRC (msg)); + + gst_message_parse_request_state (msg, &state); + + GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS (GST_BIN (pipeline), + GST_DEBUG_GRAPH_SHOW_VERBOSE, "ipc.slave.reqstate"); + + g_print ("Setting state to %s as requested by %s...\n", + gst_element_state_get_name (state), name); + + gst_element_set_state (GST_ELEMENT (pipeline), state); + g_free (name); + break; + } + case GST_MESSAGE_ELEMENT: + { + GstNavigationMessageType mtype = gst_navigation_message_get_type (msg); + if (mtype == GST_NAVIGATION_MESSAGE_EVENT) { + GstEvent *ev = NULL; + + if (gst_navigation_message_parse_event (msg, &ev)) { + GstNavigationEventType e_type = gst_navigation_event_get_type (ev); + switch (e_type) { + case GST_NAVIGATION_EVENT_KEY_PRESS: + { + const gchar *key; + + if (gst_navigation_event_parse_key_event (ev, &key)) { + GST_INFO ("Key press: %s", key); + + if (strcmp (key, "Left") == 0) + key = GST_PLAY_KB_ARROW_LEFT; + else if (strcmp (key, "Right") == 0) + key = GST_PLAY_KB_ARROW_RIGHT; + else if (strcmp (key, "Up") == 0) + key = GST_PLAY_KB_ARROW_UP; + else if (strcmp (key, "Down") == 0) + key = GST_PLAY_KB_ARROW_DOWN; + else if (strcmp (key, "space") == 0) + key = " "; + else if (strlen (key) > 1) + break; + + keyboard_cb (key, GST_ELEMENT (pipeline)); + } + break; + } + case GST_NAVIGATION_EVENT_MOUSE_BUTTON_PRESS: + { + gint button; + if (gst_navigation_event_parse_mouse_button_event (ev, &button, + NULL, NULL)) { + if (button == 4) { + /* wheel up */ + relative_seek (GST_ELEMENT (pipeline), +0.08); + } else if (button == 5) { + /* wheel down */ + relative_seek (GST_ELEMENT (pipeline), -0.01); + } + } + break; + } + default: + break; + } + } + if (ev) + gst_event_unref (ev); + } + break; + } + default: + break; + } + return TRUE; +} + +static int +sendfd (int s, int fd) +{ + char buf[1]; + struct iovec iov; + struct msghdr msg; + struct cmsghdr *cmsg; + int n; + char cms[CMSG_SPACE (sizeof (int))]; + + buf[0] = 0; + iov.iov_base = buf; + iov.iov_len = 1; + + memset (&msg, 0, sizeof msg); + msg.msg_iov = &iov; + msg.msg_iovlen = 1; + msg.msg_control = (caddr_t) cms; + msg.msg_controllen = CMSG_LEN (sizeof (int)); + + cmsg = CMSG_FIRSTHDR (&msg); + cmsg->cmsg_len = CMSG_LEN (sizeof (int)); + cmsg->cmsg_level = SOL_SOCKET; + cmsg->cmsg_type = SCM_RIGHTS; + memmove (CMSG_DATA (cmsg), &fd, sizeof (int)); + + if ((n = sendmsg (s, &msg, 0)) != iov.iov_len) + return -1; + return 0; +} + +static gint +find_ipcpipelinesink (gconstpointer e, gconstpointer c) +{ + const GValue *elem = e; + const gchar *caps_name = c; + const gchar *n = g_object_get_data (g_value_get_object (elem), + "ipcpipelinesink-caps-name"); + return g_strcmp0 (caps_name, n); +} + +/* in HLS the decodebin pads are destroyed and re-created every time + * the stream changes bitrate. This trick here ensures that the new + * pads that will appear will go and link to the same ipcpipelinesinks, + * avoiding the creation of new pipelines in the slave. */ +static void +on_pad_unlinked (GstPad * pad, GstPad * peer, GstElement * pipeline) +{ + GstCaps *caps; + const GstStructure *structure; + + caps = gst_pad_get_current_caps (pad); + structure = gst_caps_get_structure (caps, 0); + + g_object_set_data_full (G_OBJECT (GST_OBJECT_PARENT (peer)), + "ipcpipelinesink-caps-name", + g_strdup (gst_structure_get_name (structure)), g_free); +} + +static void +on_pad_added (GstElement * element, GstPad * pad, GstElement * pipeline) +{ + GstElement *ipcpipelinesink; + GstPad *sinkpad; + GstCaps *caps; + const GstStructure *structure; + GstIterator *it; + GValue elem = G_VALUE_INIT; + int sockets[2]; + gboolean create_sockets; + + caps = gst_pad_get_current_caps (pad); + structure = gst_caps_get_structure (caps, 0); + + it = gst_bin_iterate_sinks (GST_BIN (pipeline)); + if (gst_iterator_find_custom (it, find_ipcpipelinesink, &elem, + (gpointer) gst_structure_get_name (structure))) { + ipcpipelinesink = g_value_get_object (&elem); + create_sockets = FALSE; + g_value_reset (&elem); + } else { + ipcpipelinesink = gst_element_factory_make ("ipcpipelinesink", NULL); + gst_bin_add (GST_BIN (pipeline), ipcpipelinesink); + create_sockets = TRUE; + } + + sinkpad = gst_element_get_static_pad (ipcpipelinesink, "sink"); + if (gst_pad_link (pad, sinkpad) != GST_PAD_LINK_OK) { + fprintf (stderr, "Failed to link ipcpipelinesink\n"); + exit (1); + } + gst_object_unref (sinkpad); + + g_signal_connect (pad, "unlinked", (GCallback) on_pad_unlinked, pipeline); + + if (create_sockets) { + if (socketpair (AF_UNIX, SOCK_STREAM | SOCK_NONBLOCK, 0, sockets)) { + fprintf (stderr, "Error creating sockets: %s\n", strerror (errno)); + exit (1); + } + g_object_set (ipcpipelinesink, "fdin", sockets[0], "fdout", sockets[0], + NULL); + + printf ("new socket %d\n", sockets[1]); + sendfd (pipes[1], sockets[1]); + } + + gst_element_set_state (ipcpipelinesink, GST_STATE_PLAYING); + + GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS (GST_BIN (pipeline), + GST_DEBUG_GRAPH_SHOW_ALL, "pad.added"); +} + +typedef enum +{ + GST_AUTOPLUG_SELECT_TRY, + GST_AUTOPLUG_SELECT_EXPOSE, + GST_AUTOPLUG_SELECT_SKIP +} GstAutoplugSelectResult; + +static GstAutoplugSelectResult +on_autoplug_select (GstElement * uridecodebin, GstPad * pad, GstCaps * caps, + GstElementFactory * factory, GstElement * pipeline) +{ + /* if decodebin is about to plug a decoder, + * stop it right there and expose the pad; + * the slave's decodebin will take it from there... */ + if (gst_element_factory_list_is_type (factory, + GST_ELEMENT_FACTORY_TYPE_DECODER)) { + gchar *capsstr = gst_caps_to_string (caps); + g_print (" exposing to slave: %s\n", capsstr); + g_free (capsstr); + return GST_AUTOPLUG_SELECT_EXPOSE; + } + return GST_AUTOPLUG_SELECT_TRY; +} + +static void +start_source (const gchar * uri) +{ + GstElement *pipeline; + GstElement *uridecodebin; + GstBus *bus; + + pipeline = gst_pipeline_new (NULL); + + bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline)); + gst_bus_add_watch (bus, master_bus_msg, pipeline); + gst_object_unref (bus); + + uridecodebin = gst_element_factory_make ("uridecodebin", NULL); + g_object_set (uridecodebin, "uri", uri, NULL); + g_signal_connect (uridecodebin, "pad-added", G_CALLBACK (on_pad_added), + pipeline); + g_signal_connect (uridecodebin, "autoplug-select", + G_CALLBACK (on_autoplug_select), pipeline); + + gst_bin_add (GST_BIN (pipeline), uridecodebin); + gst_element_set_state (pipeline, GST_STATE_PLAYING); +} + +/*********** SLAVE ***********/ + +static gboolean +slave_bus_msg (GstBus * bus, GstMessage * msg, gpointer data) +{ + GstPipeline *pipeline = data; + + switch (GST_MESSAGE_TYPE (msg)) { + case GST_MESSAGE_ERROR:{ + GError *err; + gchar *dbg; + + gst_message_parse_error (msg, &err, &dbg); + g_printerr ("SLAVE: ERROR: %s\n", err->message); + if (dbg != NULL) + g_printerr ("SLAVE: ERROR debug information: %s\n", dbg); + g_error_free (err); + g_free (dbg); + + GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS (GST_BIN (pipeline), + GST_DEBUG_GRAPH_SHOW_ALL, "ipc.slave.error"); + break; + } + case GST_MESSAGE_WARNING:{ + GError *err; + gchar *dbg; + + gst_message_parse_warning (msg, &err, &dbg); + g_printerr ("SLAVE: WARNING: %s\n", err->message); + if (dbg != NULL) + g_printerr ("SLAVE: WARNING debug information: %s\n", dbg); + g_error_free (err); + g_free (dbg); + + GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS (GST_BIN (pipeline), + GST_DEBUG_GRAPH_SHOW_ALL, "ipc.slave.warning"); + break; + } + case GST_MESSAGE_ASYNC_START: + GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS (GST_BIN (pipeline), + GST_DEBUG_GRAPH_SHOW_VERBOSE, "ipc.slave.async-start"); + break; + case GST_MESSAGE_ASYNC_DONE: + GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS (GST_BIN (pipeline), + GST_DEBUG_GRAPH_SHOW_ALL, "ipc.slave.async-done"); + break; + default: + break; + } + return TRUE; +} + +static void +on_decoded_pad_added (GstElement * element, GstPad * pad, gpointer data) +{ + GstBin *pipeline = data; + GstCaps *caps; + GstPad *cpad; + const gchar *type; + gchar *capsstr; + + caps = gst_pad_get_current_caps (pad); + capsstr = gst_caps_to_string (caps); + printf (" caps: %s\n", capsstr); + g_free (capsstr); + + type = gst_structure_get_name (gst_caps_get_structure (caps, 0)); + if (!strcmp (type, "video/x-raw")) { + GstElement *c, *s; + c = gst_element_factory_make ("videoconvert", NULL); + s = gst_element_factory_make (arg_video_sink, NULL); + gst_bin_add_many (GST_BIN (pipeline), c, s, NULL); + gst_element_link_many (c, s, NULL); + cpad = gst_element_get_static_pad (c, "sink"); + gst_pad_link (pad, cpad); + gst_object_unref (cpad); + gst_element_set_state (s, GST_STATE_PLAYING); + gst_element_set_state (c, GST_STATE_PLAYING); + } else if (!strcmp (type, "audio/x-raw")) { + GstElement *c, *s; + c = gst_element_factory_make ("audioconvert", NULL); + s = gst_element_factory_make (arg_audio_sink, NULL); + gst_bin_add_many (GST_BIN (pipeline), c, s, NULL); + gst_element_link_many (c, s, NULL); + cpad = gst_element_get_static_pad (c, "sink"); + gst_pad_link (pad, cpad); + gst_object_unref (cpad); + gst_element_set_state (s, GST_STATE_PLAYING); + gst_element_set_state (c, GST_STATE_PLAYING); + } else { + GstElement *s; + s = gst_element_factory_make ("fakesink", NULL); + g_object_set (s, "sync", TRUE, "async", TRUE, NULL); + gst_bin_add_many (GST_BIN (pipeline), s, NULL); + cpad = gst_element_get_static_pad (s, "sink"); + gst_pad_link (pad, cpad); + gst_object_unref (cpad); + gst_element_set_state (s, GST_STATE_PLAYING); + } + + gst_caps_unref (caps); + + GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS (GST_BIN (pipeline), + GST_DEBUG_GRAPH_SHOW_ALL, "decoded.pad.added"); +} + +static int +recvfd (int s) +{ + int n; + int fd; + char buf[1]; + struct iovec iov; + struct msghdr msg; + struct cmsghdr *cmsg; + char cms[CMSG_SPACE (sizeof (int))]; + + iov.iov_base = buf; + iov.iov_len = 1; + + memset (&msg, 0, sizeof msg); + msg.msg_name = 0; + msg.msg_namelen = 0; + msg.msg_iov = &iov; + msg.msg_iovlen = 1; + + msg.msg_control = (caddr_t) cms; + msg.msg_controllen = sizeof cms; + + if ((n = recvmsg (s, &msg, 0)) < 0) + return -1; + if (n == 0) { + perror ("unexpected EOF"); + return -1; + } + cmsg = CMSG_FIRSTHDR (&msg); + memmove (&fd, CMSG_DATA (cmsg), sizeof (int)); + return fd; +} + +static gboolean +pipe_reader (gpointer data) +{ + GstElement *pipeline = data; + GstElement *ipcpipelinesrc, *mq, *decodebin; + GstPad *rpad, *sink_pad; + int fd; + fd_set set; + struct timeval tv; + int ret; + static int idx = 0; + char name[32]; + + FD_ZERO (&set); + FD_SET (pipes[0], &set); + tv.tv_sec = tv.tv_usec = 0; + ret = select (pipes[0] + 1, &set, NULL, NULL, &tv); + if (ret < 0) { + fprintf (stderr, "Failed to select: %s\n", strerror (errno)); + return TRUE; + } + if (!FD_ISSET (pipes[0], &set)) + return TRUE; + + fd = recvfd (pipes[0]); + ipcpipelinesrc = gst_element_factory_make ("ipcpipelinesrc", NULL); + gst_bin_add (GST_BIN (pipeline), ipcpipelinesrc); + g_object_set (ipcpipelinesrc, "fdin", fd, "fdout", fd, NULL); + + + mq = gst_bin_get_by_name (GST_BIN (pipeline), "mq"); + if (!mq) { + fprintf (stderr, "Failed to get mq\n"); + return TRUE; + } + if (!gst_element_link (ipcpipelinesrc, mq)) { + fprintf (stderr, "Failed to link ipcpipelinesrc and mq\n"); + return TRUE; + } + + snprintf (name, sizeof (name), "src_%u", idx++); + rpad = gst_element_get_static_pad (mq, name); + if (!rpad) { + fprintf (stderr, "Failed to get mq request pad\n"); + return TRUE; + } + + decodebin = gst_element_factory_make ("decodebin", NULL); + gst_bin_add (GST_BIN (pipeline), decodebin); + sink_pad = gst_element_get_static_pad (decodebin, "sink"); + gst_pad_link (rpad, sink_pad); + gst_object_unref (sink_pad); + + g_signal_connect (decodebin, "pad-added", G_CALLBACK (on_decoded_pad_added), + pipeline); + + /* dynamically added elements should be synced manually + * to the state of the slave pipeline */ + gst_element_sync_state_with_parent (ipcpipelinesrc); + gst_element_sync_state_with_parent (decodebin); + + GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS (GST_BIN (pipeline), + GST_DEBUG_GRAPH_SHOW_ALL, "ipc.slave.added"); + gst_object_unref (mq); + + return TRUE; +} + +static void +start_sink (void) +{ + GstElement *pipeline; + GstElement *multiqueue; + + pipeline = gst_element_factory_make ("ipcslavepipeline", NULL); + gst_bus_add_watch (GST_ELEMENT_BUS (pipeline), slave_bus_msg, pipeline); + + multiqueue = gst_element_factory_make ("multiqueue", "mq"); + gst_bin_add (GST_BIN (pipeline), multiqueue); + + g_timeout_add (10, &pipe_reader, gst_object_ref (pipeline)); + + GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS (GST_BIN (pipeline), + GST_DEBUG_GRAPH_SHOW_ALL, "ipc.sink"); + /* The state of the slave pipeline will change together with the state + * of the master, there is no need to call gst_element_set_state() here */ +} + + +/********** COMMON ***********/ + +static void +init (int *argc, char ***argv) +{ + GOptionEntry options[] = { + {"audio-sink", 0, 0, G_OPTION_ARG_STRING, &arg_audio_sink, + "Audio sink element to use (default autoaudiosink)", NULL}, + {"video-sink", 0, 0, G_OPTION_ARG_STRING, &arg_video_sink, + "Video sink element to use (default autovideosink)", NULL}, + {NULL} + }; + GOptionContext *ctx; + GError *err = NULL; + + ctx = g_option_context_new (""); + g_option_context_add_main_entries (ctx, options, ""); + if (!g_option_context_parse (ctx, argc, argv, &err)) { + fprintf (stderr, "Error initializing: %s\n", err->message); + exit (1); + } + g_option_context_free (ctx); +} + +static void +run (pid_t pid) +{ + loop = g_main_loop_new (NULL, FALSE); + g_main_loop_run (loop); + if (pid > 0) + kill (pid, SIGTERM); +} + +gint +main (gint argc, gchar ** argv) +{ + GError *error = NULL; + gchar *uri = NULL; + pid_t pid; + + init (&argc, &argv); + + if (argc < 2) { + fprintf (stderr, "usage: %s [av-filename-or-url]\n", argv[0]); + return 1; + } + + if (!g_strstr_len (argv[1], -1, "://")) { + uri = gst_filename_to_uri (argv[1], &error); + } else { + uri = g_strdup (argv[1]); + } + + if (error) { + fprintf (stderr, "usage: %s [av-filename-or-url]\n", argv[0]); + g_clear_error (&error); + return 1; + } + + if (socketpair (AF_UNIX, SOCK_STREAM | SOCK_NONBLOCK, 0, pipes)) { + fprintf (stderr, "Error creating pipes: %s\n", strerror (errno)); + return 2; + } + + pid = fork (); + if (pid < 0) { + fprintf (stderr, "Error forking: %s\n", strerror (errno)); + return 1; + } else if (pid > 0) { + setenv ("GST_DEBUG_FILE", "gstsrc.log", 1); + gst_init (&argc, &argv); + start_source (uri); + } else { + setenv ("GST_DEBUG_FILE", "gstsink.log", 1); + gst_init (&argc, &argv); + start_sink (); + } + + g_free (uri); + run (pid); + + return 0; +} diff --git a/tests/examples/ipcpipeline/ipcpipeline1.c b/tests/examples/ipcpipeline/ipcpipeline1.c new file mode 100644 index 0000000000..4459f0f54f --- /dev/null +++ b/tests/examples/ipcpipeline/ipcpipeline1.c @@ -0,0 +1,183 @@ +/* GStreamer + * + * example program for the ipcpipelinesrc/ipcpipelinesink elements + * + * Copyright (C) 2015-2017 YouView TV Ltd + * Author: Vincent Penquerc'h + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +/* + * This program shows a moving ball on a video sink, with the video sink + * running in a different process than videotestsrc. + */ + +#define _GNU_SOURCE +#include +#include +#include +#include +#include +#include +#include +#include +#include + +static GMainLoop *loop = NULL; + +static gboolean +master_bus_msg (GstBus * bus, GstMessage * msg, gpointer data) +{ + GstPipeline *pipeline = data; + + switch (GST_MESSAGE_TYPE (msg)) { + case GST_MESSAGE_ERROR:{ + GError *err; + gchar *dbg; + + gst_message_parse_error (msg, &err, &dbg); + g_printerr ("ERROR: %s\n", err->message); + if (dbg != NULL) + g_printerr ("ERROR debug information: %s\n", dbg); + g_error_free (err); + g_free (dbg); + + GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS (GST_BIN (pipeline), + GST_DEBUG_GRAPH_SHOW_ALL, "ipc.error"); + + g_main_loop_quit (loop); + break; + } + case GST_MESSAGE_WARNING:{ + GError *err; + gchar *dbg; + + gst_message_parse_warning (msg, &err, &dbg); + g_printerr ("WARNING: %s\n", err->message); + if (dbg != NULL) + g_printerr ("WARNING debug information: %s\n", dbg); + g_error_free (err); + g_free (dbg); + + GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS (GST_BIN (pipeline), + GST_DEBUG_GRAPH_SHOW_ALL, "ipc.warning"); + break; + } + case GST_MESSAGE_ASYNC_DONE: + GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS (GST_BIN (pipeline), + GST_DEBUG_GRAPH_SHOW_ALL, "ipc.async-done"); + break; + case GST_MESSAGE_EOS: + gst_element_set_state (GST_ELEMENT (pipeline), GST_STATE_NULL); + g_main_loop_quit (loop); + break; + default: + break; + } + return TRUE; +} + +static void +start_source (int fdin, int fdout) +{ + GstElement *pipeline; + GstElement *source, *ipcpipelinesink, *capsfilter; + GstCaps *caps; + + pipeline = gst_pipeline_new (NULL); + gst_bus_add_watch (GST_ELEMENT_BUS (pipeline), master_bus_msg, pipeline); + + source = gst_element_factory_make ("videotestsrc", NULL); + g_object_set (source, "pattern", 18, "num-buffers", 50, NULL); + + capsfilter = gst_element_factory_make ("capsfilter", NULL); + caps = + gst_caps_new_simple ("video/x-raw", "width", G_TYPE_INT, 640, "height", + G_TYPE_INT, 480, NULL); + g_object_set (capsfilter, "caps", caps, NULL); + gst_caps_unref (caps); + + ipcpipelinesink = gst_element_factory_make ("ipcpipelinesink", NULL); + g_object_set (ipcpipelinesink, "fdin", fdin, "fdout", fdout, NULL); + + gst_bin_add_many (GST_BIN (pipeline), source, capsfilter, ipcpipelinesink, + NULL); + gst_element_link_many (source, capsfilter, ipcpipelinesink, NULL); + + gst_element_set_state (pipeline, GST_STATE_PLAYING); + GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS (GST_BIN (pipeline), + GST_DEBUG_GRAPH_SHOW_ALL, "ipc.src"); +} + +static void +start_sink (int fdin, int fdout) +{ + GstElement *pipeline; + GstElement *ipcpipelinesrc, *navseek, *sink; + + pipeline = gst_element_factory_make ("ipcslavepipeline", NULL); + ipcpipelinesrc = gst_element_factory_make ("ipcpipelinesrc", NULL); + navseek = gst_element_factory_make ("navseek", NULL); + g_object_set (navseek, "seek-offset", 1.0, NULL); + sink = gst_element_factory_make ("autovideosink", NULL); + g_object_set (ipcpipelinesrc, "fdin", fdin, "fdout", fdout, NULL); + gst_bin_add_many (GST_BIN (pipeline), ipcpipelinesrc, navseek, sink, NULL); + gst_element_link_many (ipcpipelinesrc, navseek, sink, NULL); + + GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS (GST_BIN (pipeline), + GST_DEBUG_GRAPH_SHOW_ALL, "ipc.sink"); + /* The state of the slave pipeline will change together with the state + * of the master, there is no need to call gst_element_set_state() here */ +} + +static void +run (pid_t pid) +{ + loop = g_main_loop_new (NULL, FALSE); + g_main_loop_run (loop); + if (pid > 0) + kill (pid, SIGTERM); +} + +int +main (int argc, char **argv) +{ + int sockets[2]; + pid_t pid; + + if (socketpair (AF_UNIX, SOCK_STREAM | SOCK_NONBLOCK, 0, sockets) < 0) { + fprintf (stderr, "Error creating sockets: %s\n", strerror (errno)); + return 1; + } + pid = fork (); + if (pid < 0) { + fprintf (stderr, "Error forking: %s\n", strerror (errno)); + return 1; + } else if (pid > 0) { + setenv ("GST_DEBUG_FILE", "gstsrc.log", 1); + gst_init (&argc, &argv); + start_source (sockets[0], sockets[0]); + } else { + setenv ("GST_DEBUG_FILE", "gstsink.log", 1); + gst_init (&argc, &argv); + start_sink (sockets[1], sockets[1]); + } + + run (pid); + + return 0; +}