diff --git a/subprojects/gst-devtools/validate/gst/validate/gst-validate-scenario.c b/subprojects/gst-devtools/validate/gst/validate/gst-validate-scenario.c index 81587e765c..27cff22282 100644 --- a/subprojects/gst-devtools/validate/gst/validate/gst-validate-scenario.c +++ b/subprojects/gst-devtools/validate/gst/validate/gst-validate-scenario.c @@ -2124,6 +2124,162 @@ done: return res; } +typedef struct +{ + GstValidateAction *action; + GRecMutex m; + gulong sid; + + GList *wanted_streams; +} SelectStreamData; + +static SelectStreamData * +select_stream_data_new (GstValidateAction * action) +{ + SelectStreamData *d = g_new0 (SelectStreamData, 1); + + d->action = action; + + return d; +} + +static void +select_stream_data_free (SelectStreamData * d) +{ + gst_validate_action_unref (d->action); + g_list_free_full (d->wanted_streams, g_free); + g_free (d); +} + + +static void +stream_selection_cb (GstBus * bus, GstMessage * message, SelectStreamData * d) +{ + GstValidateScenario *scenario = NULL; + GstStreamCollection *collection = NULL, *selected_streams = NULL; + GList *streams = NULL; + + switch (GST_MESSAGE_TYPE (message)) { + case GST_MESSAGE_STREAM_COLLECTION: + /* Handle the case were we have 2 StreamCollection message happennning at the + * same time on the bus */ + g_rec_mutex_lock (&d->m); + scenario = gst_validate_action_get_scenario (d->action); + gst_message_parse_stream_collection (message, &collection); + g_assert (collection); + break; + case GST_MESSAGE_STREAMS_SELECTED: + g_rec_mutex_lock (&d->m); + gst_message_parse_streams_selected (message, &selected_streams); + g_assert (selected_streams); + goto done; + default: + return; + } + + const GValue *v = gst_structure_get_value (d->action->structure, "indexes"); + if (G_VALUE_HOLDS_INT (v)) { + GstStream *stream = + gst_stream_collection_get_stream (collection, g_value_get_int (v)); + + if (!stream) { + GST_VALIDATE_REPORT_ACTION (scenario, d->action, + SCENARIO_ACTION_EXECUTION_ERROR, + "Could not find stream with index %d in %" GST_PTR_FORMAT, + g_value_get_int (v), collection); + goto done; + } + + streams = + g_list_append (streams, g_strdup (gst_stream_get_stream_id (stream))); + } else if (GST_VALUE_HOLDS_ARRAY (v)) { + for (gint i = 0; i < gst_value_array_get_size (v); i++) { + const GValue *ivalue = gst_value_array_get_value (v, i); + + if (!G_VALUE_HOLDS_INT (ivalue)) { + gst_validate_error_structure (d->action, + "Could not parse `indexes` in %" GST_PTR_FORMAT, + d->action->structure); + goto done; + } + + GstStream *stream = gst_stream_collection_get_stream (collection, + g_value_get_int (ivalue)); + if (!stream) { + GST_VALIDATE_REPORT_ACTION (scenario, d->action, + SCENARIO_ACTION_EXECUTION_ERROR, + "Could not find stream with index %d in %" GST_PTR_FORMAT, + g_value_get_int (ivalue), collection); + goto done; + } + streams = + g_list_append (streams, g_strdup (gst_stream_get_stream_id (stream))); + } + } else { + gst_validate_error_structure (d->action, + "Could not parse `indexes` in %" GST_PTR_FORMAT, d->action->structure); + goto done; + } + + + GstElement *pipeline = gst_validate_scenario_get_pipeline (scenario); + if (pipeline == NULL) { + GST_VALIDATE_REPORT_ACTION (scenario, d->action, + SCENARIO_ACTION_EXECUTION_ERROR, + "Can't execute a '%s' action after the pipeline " "has been destroyed.", + d->action->type); + + goto done; + } + + if (!gst_element_send_event (GST_ELEMENT (GST_MESSAGE_SRC (message)), + gst_event_new_select_streams (streams))) { + GST_VALIDATE_REPORT_ACTION (scenario, d->action, + SCENARIO_ACTION_EXECUTION_ERROR, + "Could not send `SELECT_STREAM` event!"); + } + + g_list_free_full (d->wanted_streams, g_free); + d->wanted_streams = streams; + +done: + if (selected_streams && d->sid) { + /* Consider action done once we get the STREAM_SELECTED signal */ + gst_validate_action_set_done (gst_validate_action_ref (d->action)); + gst_bus_disable_sync_message_emission (bus); + g_signal_handler_disconnect (bus, d->sid); + d->sid = 0; + } + + gst_clear_object (&scenario); + gst_clear_object (&collection); + + g_rec_mutex_unlock (&d->m); +} + +static GstValidateExecuteActionReturn +_execute_select_streams (GstValidateScenario * scenario, + GstValidateAction * action) +{ + DECLARE_AND_GET_PIPELINE (scenario, action); + + GstBus *bus = gst_element_get_bus (pipeline); + gst_bus_enable_sync_message_emission (bus); + + SelectStreamData *d = select_stream_data_new (action); + /* Ensure that the data signal ID is set before the callback is called */ + g_rec_mutex_lock (&d->m); + d->sid = g_signal_connect_data (bus, + "sync-message", + G_CALLBACK (stream_selection_cb), + d, (GClosureNotify) select_stream_data_free, 0); + g_rec_mutex_unlock (&d->m); + + gst_object_unref (bus); + + return GST_VALIDATE_EXECUTE_ACTION_NON_BLOCKING; +} + static GstValidateExecuteActionReturn _execute_switch_track (GstValidateScenario * scenario, GstValidateAction * action) @@ -7056,6 +7212,20 @@ register_action_types (void) "Sends an EOS event to the pipeline", GST_VALIDATE_ACTION_TYPE_NO_EXECUTION_NOT_FATAL); + REGISTER_ACTION_TYPE ("select-streams", _execute_select_streams, + ((GstValidateActionParameter []) { + { + .name = "indexes", + .description = "Indexes of the streams in the StreamCollection to select", + .mandatory = TRUE, + .types = "[int]", + .possible_variables = NULL, + }, + {NULL} + }), + "Select the stream on next `GST_STREAM_COLLECTION` message on the bus.", + GST_VALIDATE_ACTION_TYPE_NON_BLOCKING); + REGISTER_ACTION_TYPE ("switch-track", _execute_switch_track, ((GstValidateActionParameter []) { {