unixfd: Fix racy unit test by adding wait-for-connection property

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/6765>
This commit is contained in:
Xavier Claessens 2024-04-29 14:30:49 -04:00 committed by GStreamer Marge Bot
parent 5da635c2da
commit 9b946098df
3 changed files with 98 additions and 11 deletions

View file

@ -246144,6 +246144,18 @@
"readable": true, "readable": true,
"type": "GUnixSocketAddressType", "type": "GUnixSocketAddressType",
"writable": true "writable": true
},
"wait-for-connection": {
"blurb": "Block the stream until a least one client is connected",
"conditionally-available": false,
"construct": true,
"construct-only": false,
"controllable": false,
"default": "false",
"mutable": "null",
"readable": true,
"type": "gboolean",
"writable": true
} }
}, },
"rank": "none" "rank": "none"

View file

@ -87,6 +87,10 @@ struct _GstUnixFdSink
GstCaps *caps; GstCaps *caps;
gboolean uses_monotonic_clock; gboolean uses_monotonic_clock;
GByteArray *payload; GByteArray *payload;
gboolean wait_for_connection;
GCond wait_for_connection_cond;
gboolean unlock;
}; };
G_DEFINE_TYPE (GstUnixFdSink, gst_unix_fd_sink, GST_TYPE_BASE_SINK); G_DEFINE_TYPE (GstUnixFdSink, gst_unix_fd_sink, GST_TYPE_BASE_SINK);
@ -94,12 +98,14 @@ GST_ELEMENT_REGISTER_DEFINE (unixfdsink, "unixfdsink", GST_RANK_NONE,
GST_TYPE_UNIX_FD_SINK); GST_TYPE_UNIX_FD_SINK);
#define DEFAULT_SOCKET_TYPE G_UNIX_SOCKET_ADDRESS_PATH #define DEFAULT_SOCKET_TYPE G_UNIX_SOCKET_ADDRESS_PATH
#define DEFAULT_WAIT_FOR_CONNECTION FALSE
enum enum
{ {
PROP_0, PROP_0,
PROP_SOCKET_PATH, PROP_SOCKET_PATH,
PROP_SOCKET_TYPE, PROP_SOCKET_TYPE,
PROP_WAIT_FOR_CONNECTION,
}; };
@ -122,6 +128,7 @@ gst_unix_fd_sink_init (GstUnixFdSink * self)
self->clients = self->clients =
g_hash_table_new_full (NULL, NULL, g_object_unref, g_hash_table_new_full (NULL, NULL, g_object_unref,
(GDestroyNotify) client_free); (GDestroyNotify) client_free);
g_cond_init (&self->wait_for_connection_cond);
} }
static void static void
@ -133,6 +140,7 @@ gst_unix_fd_sink_finalize (GObject * object)
g_main_context_unref (self->context); g_main_context_unref (self->context);
g_main_loop_unref (self->loop); g_main_loop_unref (self->loop);
g_hash_table_unref (self->clients); g_hash_table_unref (self->clients);
g_cond_clear (&self->wait_for_connection_cond);
G_OBJECT_CLASS (gst_unix_fd_sink_parent_class)->finalize (object); G_OBJECT_CLASS (gst_unix_fd_sink_parent_class)->finalize (object);
} }
@ -163,6 +171,10 @@ gst_unix_fd_sink_set_property (GObject * object, guint prop_id,
} }
self->socket_type = g_value_get_enum (value); self->socket_type = g_value_get_enum (value);
break; break;
case PROP_WAIT_FOR_CONNECTION:
self->wait_for_connection = g_value_get_boolean (value);
g_cond_signal (&self->wait_for_connection_cond);
break;
default: default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break; break;
@ -186,6 +198,9 @@ gst_unix_fd_sink_get_property (GObject * object, guint prop_id,
case PROP_SOCKET_TYPE: case PROP_SOCKET_TYPE:
g_value_set_enum (value, self->socket_type); g_value_set_enum (value, self->socket_type);
break; break;
case PROP_WAIT_FOR_CONNECTION:
g_value_set_boolean (value, self->wait_for_connection);
break;
default: default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break; break;
@ -314,6 +329,8 @@ new_client_cb (GSocket * socket, GIOCondition cond, gpointer user_data)
} }
g_free (payload); g_free (payload);
g_cond_signal (&self->wait_for_connection_cond);
GST_OBJECT_UNLOCK (self); GST_OBJECT_UNLOCK (self);
return G_SOURCE_CONTINUE; return G_SOURCE_CONTINUE;
@ -551,8 +568,21 @@ gst_unix_fd_sink_render (GstBaseSink * bsink, GstBuffer * buffer)
new_buffer->type = MEMORY_TYPE_DMABUF; new_buffer->type = MEMORY_TYPE_DMABUF;
GST_OBJECT_LOCK (self); GST_OBJECT_LOCK (self);
while (self->wait_for_connection && g_hash_table_size (self->clients) == 0) {
g_cond_wait (&self->wait_for_connection_cond, GST_OBJECT_GET_LOCK (self));
if (self->unlock) {
GST_OBJECT_UNLOCK (self);
ret = gst_base_sink_wait_preroll (bsink);
if (ret != GST_FLOW_OK)
goto out;
GST_OBJECT_LOCK (self);
}
}
send_command_to_all (self, COMMAND_TYPE_NEW_BUFFER, fds, send_command_to_all (self, COMMAND_TYPE_NEW_BUFFER, fds,
self->payload->data, self->payload->len, buffer); self->payload->data, self->payload->len, buffer);
GST_OBJECT_UNLOCK (self); GST_OBJECT_UNLOCK (self);
out: out:
@ -561,6 +591,31 @@ out:
return ret; return ret;
} }
static gboolean
gst_unix_fd_sink_unlock (GstBaseSink * bsink)
{
GstUnixFdSink *self = (GstUnixFdSink *) bsink;
GST_OBJECT_LOCK (self);
self->unlock = TRUE;
g_cond_signal (&self->wait_for_connection_cond);
GST_OBJECT_UNLOCK (self);
return TRUE;
}
static gboolean
gst_unix_fd_sink_unlock_stop (GstBaseSink * bsink)
{
GstUnixFdSink *self = (GstUnixFdSink *) bsink;
GST_OBJECT_LOCK (self);
self->unlock = FALSE;
GST_OBJECT_UNLOCK (self);
return TRUE;
}
static gboolean static gboolean
gst_unix_fd_sink_event (GstBaseSink * bsink, GstEvent * event) gst_unix_fd_sink_event (GstBaseSink * bsink, GstEvent * event)
{ {
@ -650,6 +705,9 @@ gst_unix_fd_sink_class_init (GstUnixFdSinkClass * klass)
gstbasesink_class->event = GST_DEBUG_FUNCPTR (gst_unix_fd_sink_event); gstbasesink_class->event = GST_DEBUG_FUNCPTR (gst_unix_fd_sink_event);
gstbasesink_class->propose_allocation = gstbasesink_class->propose_allocation =
GST_DEBUG_FUNCPTR (gst_unix_fd_sink_propose_allocation); GST_DEBUG_FUNCPTR (gst_unix_fd_sink_propose_allocation);
gstbasesink_class->unlock = GST_DEBUG_FUNCPTR (gst_unix_fd_sink_unlock);
gstbasesink_class->unlock_stop =
GST_DEBUG_FUNCPTR (gst_unix_fd_sink_unlock_stop);
g_object_class_install_property (gobject_class, PROP_SOCKET_PATH, g_object_class_install_property (gobject_class, PROP_SOCKET_PATH,
g_param_spec_string ("socket-path", g_param_spec_string ("socket-path",
@ -666,4 +724,18 @@ gst_unix_fd_sink_class_init (GstUnixFdSinkClass * klass)
G_TYPE_UNIX_SOCKET_ADDRESS_TYPE, DEFAULT_SOCKET_TYPE, G_TYPE_UNIX_SOCKET_ADDRESS_TYPE, DEFAULT_SOCKET_TYPE,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT |
GST_PARAM_MUTABLE_READY)); GST_PARAM_MUTABLE_READY));
/**
* GstUnixFdSink:wait-for-connection:
*
* Block the stream until a least one client is connected.
*
* Since: 1.26
*/
g_object_class_install_property (gobject_class, PROP_WAIT_FOR_CONNECTION,
g_param_spec_boolean ("wait-for-connection",
"Wait for a connection until rendering",
"Block the stream until a least one client is connected",
DEFAULT_WAIT_FOR_CONNECTION,
G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS));
} }

View file

@ -59,11 +59,9 @@ GST_START_TEST (test_unixfd_videotestsrc)
gst_meta_register_custom ("unix-fd-custom-meta", tags, NULL, NULL, NULL); gst_meta_register_custom ("unix-fd-custom-meta", tags, NULL, NULL, NULL);
/* Ensure we don't have socket from previous failed test */ /* Ensure we don't have socket from previous failed test */
gchar *socket_path = gchar *tempdir = g_dir_make_tmp ("unixfd-test-XXXXXX", &error);
g_strdup_printf ("%s/unixfd-test-socket", g_get_user_runtime_dir ()); g_assert_no_error (error);
if (g_file_test (socket_path, G_FILE_TEST_EXISTS)) { gchar *socket_path = g_strdup_printf ("%s/socket", tempdir);
g_unlink (socket_path);
}
/* Setup source */ /* Setup source */
gchar *pipeline_str = gchar *pipeline_str =
@ -126,6 +124,9 @@ GST_START_TEST (test_unixfd_videotestsrc)
GST_STATE_NULL) == GST_STATE_CHANGE_SUCCESS); GST_STATE_NULL) == GST_STATE_CHANGE_SUCCESS);
fail_if (g_file_test (socket_path, G_FILE_TEST_EXISTS)); fail_if (g_file_test (socket_path, G_FILE_TEST_EXISTS));
g_rmdir (tempdir);
g_free (tempdir);
gst_object_unref (pipeline_service); gst_object_unref (pipeline_service);
gst_object_unref (pipeline_client_1); gst_object_unref (pipeline_client_1);
gst_object_unref (pipeline_client_2); gst_object_unref (pipeline_client_2);
@ -142,18 +143,16 @@ GST_START_TEST (test_unixfd_segment)
GError *error = NULL; GError *error = NULL;
/* Ensure we don't have socket from previous failed test */ /* Ensure we don't have socket from previous failed test */
gchar *socket_path = gchar *tempdir = g_dir_make_tmp ("unixfd-test-XXXXXX", &error);
g_strdup_printf ("%s/unixfd-test-socket", g_get_user_runtime_dir ()); g_assert_no_error (error);
if (g_file_test (socket_path, G_FILE_TEST_EXISTS)) { gchar *socket_path = g_strdup_printf ("%s/socket", tempdir);
g_unlink (socket_path);
}
GstCaps *caps = gst_caps_new_empty_simple ("video/x-raw"); GstCaps *caps = gst_caps_new_empty_simple ("video/x-raw");
/* Setup service */ /* Setup service */
gchar *pipeline_str = gchar *pipeline_str =
g_strdup_printf g_strdup_printf
("appsrc name=src format=time handle-segment-change=true ! unixfdsink socket-path=%s sync=false async=false", ("appsrc name=src format=time handle-segment-change=true ! unixfdsink socket-path=%s sync=false async=false wait-for-connection=true",
socket_path); socket_path);
GstElement *pipeline_service = gst_parse_launch (pipeline_str, &error); GstElement *pipeline_service = gst_parse_launch (pipeline_str, &error);
g_assert_no_error (error); g_assert_no_error (error);
@ -215,6 +214,10 @@ GST_START_TEST (test_unixfd_segment)
GST_STATE_NULL) == GST_STATE_CHANGE_SUCCESS); GST_STATE_NULL) == GST_STATE_CHANGE_SUCCESS);
fail_unless (gst_element_set_state (pipeline_service, fail_unless (gst_element_set_state (pipeline_service,
GST_STATE_NULL) == GST_STATE_CHANGE_SUCCESS); GST_STATE_NULL) == GST_STATE_CHANGE_SUCCESS);
g_rmdir (tempdir);
g_free (tempdir);
gst_object_unref (pipeline_service); gst_object_unref (pipeline_service);
gst_object_unref (pipeline_client); gst_object_unref (pipeline_client);
g_free (socket_path); g_free (socket_path);