diff --git a/subprojects/gst-plugins-bad/docs/plugins/gst_plugins_cache.json b/subprojects/gst-plugins-bad/docs/plugins/gst_plugins_cache.json index 710af25210..e536fcf1be 100644 --- a/subprojects/gst-plugins-bad/docs/plugins/gst_plugins_cache.json +++ b/subprojects/gst-plugins-bad/docs/plugins/gst_plugins_cache.json @@ -246144,6 +246144,18 @@ "readable": true, "type": "GUnixSocketAddressType", "writable": true + }, + "wait-for-connection": { + "blurb": "Block the stream until a least one client is connected", + "conditionally-available": false, + "construct": true, + "construct-only": false, + "controllable": false, + "default": "false", + "mutable": "null", + "readable": true, + "type": "gboolean", + "writable": true } }, "rank": "none" diff --git a/subprojects/gst-plugins-bad/gst/unixfd/gstunixfdsink.c b/subprojects/gst-plugins-bad/gst/unixfd/gstunixfdsink.c index fa4e39b11e..bd894ade7b 100644 --- a/subprojects/gst-plugins-bad/gst/unixfd/gstunixfdsink.c +++ b/subprojects/gst-plugins-bad/gst/unixfd/gstunixfdsink.c @@ -87,6 +87,10 @@ struct _GstUnixFdSink GstCaps *caps; gboolean uses_monotonic_clock; GByteArray *payload; + + gboolean wait_for_connection; + GCond wait_for_connection_cond; + gboolean unlock; }; G_DEFINE_TYPE (GstUnixFdSink, gst_unix_fd_sink, GST_TYPE_BASE_SINK); @@ -94,12 +98,14 @@ GST_ELEMENT_REGISTER_DEFINE (unixfdsink, "unixfdsink", GST_RANK_NONE, GST_TYPE_UNIX_FD_SINK); #define DEFAULT_SOCKET_TYPE G_UNIX_SOCKET_ADDRESS_PATH +#define DEFAULT_WAIT_FOR_CONNECTION FALSE enum { PROP_0, PROP_SOCKET_PATH, PROP_SOCKET_TYPE, + PROP_WAIT_FOR_CONNECTION, }; @@ -122,6 +128,7 @@ gst_unix_fd_sink_init (GstUnixFdSink * self) self->clients = g_hash_table_new_full (NULL, NULL, g_object_unref, (GDestroyNotify) client_free); + g_cond_init (&self->wait_for_connection_cond); } static void @@ -133,6 +140,7 @@ gst_unix_fd_sink_finalize (GObject * object) g_main_context_unref (self->context); g_main_loop_unref (self->loop); g_hash_table_unref (self->clients); + g_cond_clear (&self->wait_for_connection_cond); G_OBJECT_CLASS (gst_unix_fd_sink_parent_class)->finalize (object); } @@ -163,6 +171,10 @@ gst_unix_fd_sink_set_property (GObject * object, guint prop_id, } self->socket_type = g_value_get_enum (value); break; + case PROP_WAIT_FOR_CONNECTION: + self->wait_for_connection = g_value_get_boolean (value); + g_cond_signal (&self->wait_for_connection_cond); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -186,6 +198,9 @@ gst_unix_fd_sink_get_property (GObject * object, guint prop_id, case PROP_SOCKET_TYPE: g_value_set_enum (value, self->socket_type); break; + case PROP_WAIT_FOR_CONNECTION: + g_value_set_boolean (value, self->wait_for_connection); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -314,6 +329,8 @@ new_client_cb (GSocket * socket, GIOCondition cond, gpointer user_data) } g_free (payload); + g_cond_signal (&self->wait_for_connection_cond); + GST_OBJECT_UNLOCK (self); return G_SOURCE_CONTINUE; @@ -551,8 +568,21 @@ gst_unix_fd_sink_render (GstBaseSink * bsink, GstBuffer * buffer) new_buffer->type = MEMORY_TYPE_DMABUF; GST_OBJECT_LOCK (self); + + while (self->wait_for_connection && g_hash_table_size (self->clients) == 0) { + g_cond_wait (&self->wait_for_connection_cond, GST_OBJECT_GET_LOCK (self)); + if (self->unlock) { + GST_OBJECT_UNLOCK (self); + ret = gst_base_sink_wait_preroll (bsink); + if (ret != GST_FLOW_OK) + goto out; + GST_OBJECT_LOCK (self); + } + } + send_command_to_all (self, COMMAND_TYPE_NEW_BUFFER, fds, self->payload->data, self->payload->len, buffer); + GST_OBJECT_UNLOCK (self); out: @@ -561,6 +591,31 @@ out: return ret; } +static gboolean +gst_unix_fd_sink_unlock (GstBaseSink * bsink) +{ + GstUnixFdSink *self = (GstUnixFdSink *) bsink; + + GST_OBJECT_LOCK (self); + self->unlock = TRUE; + g_cond_signal (&self->wait_for_connection_cond); + GST_OBJECT_UNLOCK (self); + + return TRUE; +} + +static gboolean +gst_unix_fd_sink_unlock_stop (GstBaseSink * bsink) +{ + GstUnixFdSink *self = (GstUnixFdSink *) bsink; + + GST_OBJECT_LOCK (self); + self->unlock = FALSE; + GST_OBJECT_UNLOCK (self); + + return TRUE; +} + static gboolean gst_unix_fd_sink_event (GstBaseSink * bsink, GstEvent * event) { @@ -650,6 +705,9 @@ gst_unix_fd_sink_class_init (GstUnixFdSinkClass * klass) gstbasesink_class->event = GST_DEBUG_FUNCPTR (gst_unix_fd_sink_event); gstbasesink_class->propose_allocation = GST_DEBUG_FUNCPTR (gst_unix_fd_sink_propose_allocation); + gstbasesink_class->unlock = GST_DEBUG_FUNCPTR (gst_unix_fd_sink_unlock); + gstbasesink_class->unlock_stop = + GST_DEBUG_FUNCPTR (gst_unix_fd_sink_unlock_stop); g_object_class_install_property (gobject_class, PROP_SOCKET_PATH, g_param_spec_string ("socket-path", @@ -666,4 +724,18 @@ gst_unix_fd_sink_class_init (GstUnixFdSinkClass * klass) G_TYPE_UNIX_SOCKET_ADDRESS_TYPE, DEFAULT_SOCKET_TYPE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT | GST_PARAM_MUTABLE_READY)); + + /** + * GstUnixFdSink:wait-for-connection: + * + * Block the stream until a least one client is connected. + * + * Since: 1.26 + */ + g_object_class_install_property (gobject_class, PROP_WAIT_FOR_CONNECTION, + g_param_spec_boolean ("wait-for-connection", + "Wait for a connection until rendering", + "Block the stream until a least one client is connected", + DEFAULT_WAIT_FOR_CONNECTION, + G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS)); } diff --git a/subprojects/gst-plugins-bad/tests/check/elements/unixfd.c b/subprojects/gst-plugins-bad/tests/check/elements/unixfd.c index 832b502217..0bde3af467 100644 --- a/subprojects/gst-plugins-bad/tests/check/elements/unixfd.c +++ b/subprojects/gst-plugins-bad/tests/check/elements/unixfd.c @@ -59,11 +59,9 @@ GST_START_TEST (test_unixfd_videotestsrc) gst_meta_register_custom ("unix-fd-custom-meta", tags, NULL, NULL, NULL); /* Ensure we don't have socket from previous failed test */ - gchar *socket_path = - g_strdup_printf ("%s/unixfd-test-socket", g_get_user_runtime_dir ()); - if (g_file_test (socket_path, G_FILE_TEST_EXISTS)) { - g_unlink (socket_path); - } + gchar *tempdir = g_dir_make_tmp ("unixfd-test-XXXXXX", &error); + g_assert_no_error (error); + gchar *socket_path = g_strdup_printf ("%s/socket", tempdir); /* Setup source */ gchar *pipeline_str = @@ -126,6 +124,9 @@ GST_START_TEST (test_unixfd_videotestsrc) GST_STATE_NULL) == GST_STATE_CHANGE_SUCCESS); fail_if (g_file_test (socket_path, G_FILE_TEST_EXISTS)); + g_rmdir (tempdir); + g_free (tempdir); + gst_object_unref (pipeline_service); gst_object_unref (pipeline_client_1); gst_object_unref (pipeline_client_2); @@ -142,18 +143,16 @@ GST_START_TEST (test_unixfd_segment) GError *error = NULL; /* Ensure we don't have socket from previous failed test */ - gchar *socket_path = - g_strdup_printf ("%s/unixfd-test-socket", g_get_user_runtime_dir ()); - if (g_file_test (socket_path, G_FILE_TEST_EXISTS)) { - g_unlink (socket_path); - } + gchar *tempdir = g_dir_make_tmp ("unixfd-test-XXXXXX", &error); + g_assert_no_error (error); + gchar *socket_path = g_strdup_printf ("%s/socket", tempdir); GstCaps *caps = gst_caps_new_empty_simple ("video/x-raw"); /* Setup service */ gchar *pipeline_str = g_strdup_printf - ("appsrc name=src format=time handle-segment-change=true ! unixfdsink socket-path=%s sync=false async=false", + ("appsrc name=src format=time handle-segment-change=true ! unixfdsink socket-path=%s sync=false async=false wait-for-connection=true", socket_path); GstElement *pipeline_service = gst_parse_launch (pipeline_str, &error); g_assert_no_error (error); @@ -215,6 +214,10 @@ GST_START_TEST (test_unixfd_segment) GST_STATE_NULL) == GST_STATE_CHANGE_SUCCESS); fail_unless (gst_element_set_state (pipeline_service, GST_STATE_NULL) == GST_STATE_CHANGE_SUCCESS); + + g_rmdir (tempdir); + g_free (tempdir); + gst_object_unref (pipeline_service); gst_object_unref (pipeline_client); g_free (socket_path);