validate: scenario: Add a "select-streams" action type

This is a "non-blocking" action type which will send the `select-streams`
event when a `GST_STREAM_COLLECTION` message is received on the bus.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/5666>
This commit is contained in:
Thibault Saunier 2023-07-31 08:32:04 -04:00 committed by GStreamer Marge Bot
parent d81fe1352c
commit e251522805

View file

@ -2124,6 +2124,162 @@ done:
return res; return res;
} }
typedef struct
{
GstValidateAction *action;
GRecMutex m;
gulong sid;
GList *wanted_streams;
} SelectStreamData;
static SelectStreamData *
select_stream_data_new (GstValidateAction * action)
{
SelectStreamData *d = g_new0 (SelectStreamData, 1);
d->action = action;
return d;
}
static void
select_stream_data_free (SelectStreamData * d)
{
gst_validate_action_unref (d->action);
g_list_free_full (d->wanted_streams, g_free);
g_free (d);
}
static void
stream_selection_cb (GstBus * bus, GstMessage * message, SelectStreamData * d)
{
GstValidateScenario *scenario = NULL;
GstStreamCollection *collection = NULL, *selected_streams = NULL;
GList *streams = NULL;
switch (GST_MESSAGE_TYPE (message)) {
case GST_MESSAGE_STREAM_COLLECTION:
/* Handle the case were we have 2 StreamCollection message happennning at the
* same time on the bus */
g_rec_mutex_lock (&d->m);
scenario = gst_validate_action_get_scenario (d->action);
gst_message_parse_stream_collection (message, &collection);
g_assert (collection);
break;
case GST_MESSAGE_STREAMS_SELECTED:
g_rec_mutex_lock (&d->m);
gst_message_parse_streams_selected (message, &selected_streams);
g_assert (selected_streams);
goto done;
default:
return;
}
const GValue *v = gst_structure_get_value (d->action->structure, "indexes");
if (G_VALUE_HOLDS_INT (v)) {
GstStream *stream =
gst_stream_collection_get_stream (collection, g_value_get_int (v));
if (!stream) {
GST_VALIDATE_REPORT_ACTION (scenario, d->action,
SCENARIO_ACTION_EXECUTION_ERROR,
"Could not find stream with index %d in %" GST_PTR_FORMAT,
g_value_get_int (v), collection);
goto done;
}
streams =
g_list_append (streams, g_strdup (gst_stream_get_stream_id (stream)));
} else if (GST_VALUE_HOLDS_ARRAY (v)) {
for (gint i = 0; i < gst_value_array_get_size (v); i++) {
const GValue *ivalue = gst_value_array_get_value (v, i);
if (!G_VALUE_HOLDS_INT (ivalue)) {
gst_validate_error_structure (d->action,
"Could not parse `indexes` in %" GST_PTR_FORMAT,
d->action->structure);
goto done;
}
GstStream *stream = gst_stream_collection_get_stream (collection,
g_value_get_int (ivalue));
if (!stream) {
GST_VALIDATE_REPORT_ACTION (scenario, d->action,
SCENARIO_ACTION_EXECUTION_ERROR,
"Could not find stream with index %d in %" GST_PTR_FORMAT,
g_value_get_int (ivalue), collection);
goto done;
}
streams =
g_list_append (streams, g_strdup (gst_stream_get_stream_id (stream)));
}
} else {
gst_validate_error_structure (d->action,
"Could not parse `indexes` in %" GST_PTR_FORMAT, d->action->structure);
goto done;
}
GstElement *pipeline = gst_validate_scenario_get_pipeline (scenario);
if (pipeline == NULL) {
GST_VALIDATE_REPORT_ACTION (scenario, d->action,
SCENARIO_ACTION_EXECUTION_ERROR,
"Can't execute a '%s' action after the pipeline " "has been destroyed.",
d->action->type);
goto done;
}
if (!gst_element_send_event (GST_ELEMENT (GST_MESSAGE_SRC (message)),
gst_event_new_select_streams (streams))) {
GST_VALIDATE_REPORT_ACTION (scenario, d->action,
SCENARIO_ACTION_EXECUTION_ERROR,
"Could not send `SELECT_STREAM` event!");
}
g_list_free_full (d->wanted_streams, g_free);
d->wanted_streams = streams;
done:
if (selected_streams && d->sid) {
/* Consider action done once we get the STREAM_SELECTED signal */
gst_validate_action_set_done (gst_validate_action_ref (d->action));
gst_bus_disable_sync_message_emission (bus);
g_signal_handler_disconnect (bus, d->sid);
d->sid = 0;
}
gst_clear_object (&scenario);
gst_clear_object (&collection);
g_rec_mutex_unlock (&d->m);
}
static GstValidateExecuteActionReturn
_execute_select_streams (GstValidateScenario * scenario,
GstValidateAction * action)
{
DECLARE_AND_GET_PIPELINE (scenario, action);
GstBus *bus = gst_element_get_bus (pipeline);
gst_bus_enable_sync_message_emission (bus);
SelectStreamData *d = select_stream_data_new (action);
/* Ensure that the data signal ID is set before the callback is called */
g_rec_mutex_lock (&d->m);
d->sid = g_signal_connect_data (bus,
"sync-message",
G_CALLBACK (stream_selection_cb),
d, (GClosureNotify) select_stream_data_free, 0);
g_rec_mutex_unlock (&d->m);
gst_object_unref (bus);
return GST_VALIDATE_EXECUTE_ACTION_NON_BLOCKING;
}
static GstValidateExecuteActionReturn static GstValidateExecuteActionReturn
_execute_switch_track (GstValidateScenario * scenario, _execute_switch_track (GstValidateScenario * scenario,
GstValidateAction * action) GstValidateAction * action)
@ -7056,6 +7212,20 @@ register_action_types (void)
"Sends an EOS event to the pipeline", "Sends an EOS event to the pipeline",
GST_VALIDATE_ACTION_TYPE_NO_EXECUTION_NOT_FATAL); GST_VALIDATE_ACTION_TYPE_NO_EXECUTION_NOT_FATAL);
REGISTER_ACTION_TYPE ("select-streams", _execute_select_streams,
((GstValidateActionParameter []) {
{
.name = "indexes",
.description = "Indexes of the streams in the StreamCollection to select",
.mandatory = TRUE,
.types = "[int]",
.possible_variables = NULL,
},
{NULL}
}),
"Select the stream on next `GST_STREAM_COLLECTION` message on the bus.",
GST_VALIDATE_ACTION_TYPE_NON_BLOCKING);
REGISTER_ACTION_TYPE ("switch-track", _execute_switch_track, REGISTER_ACTION_TYPE ("switch-track", _execute_switch_track,
((GstValidateActionParameter []) { ((GstValidateActionParameter []) {
{ {