scenario: Ensure that messages are handled from the right thread

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-devtools/-/merge_requests/233>
This commit is contained in:
Thibault Saunier 2020-12-10 16:26:15 -03:00
parent 59c3802711
commit 373071a743

View file

@ -268,6 +268,8 @@ struct _GstValidateScenarioPrivate
GstTestClock *clock; GstTestClock *clock;
guint segments_needed; guint segments_needed;
GMainContext *context;
}; };
typedef struct KeyFileGroupName typedef struct KeyFileGroupName
@ -360,6 +362,8 @@ struct _GstValidateActionPrivate
GWeakRef scenario; GWeakRef scenario;
gboolean needs_playback_parsing; gboolean needs_playback_parsing;
gboolean pending_set_done; gboolean pending_set_done;
GMainContext *context;
}; };
static JsonNode * static JsonNode *
@ -2436,7 +2440,9 @@ gst_validate_execute_action (GstValidateActionType * action_type,
GST_VALIDATE_EXECUTE_ACTION_ERROR); GST_VALIDATE_EXECUTE_ACTION_ERROR);
scenario = gst_validate_action_get_scenario (action); scenario = gst_validate_action_get_scenario (action);
g_assert (scenario);
action->priv->context = g_main_context_ref (scenario->priv->context);
if (action_type->prepare) { if (action_type->prepare) {
res = action_type->prepare (action); res = action_type->prepare (action);
if (res == GST_VALIDATE_EXECUTE_ACTION_DONE) { if (res == GST_VALIDATE_EXECUTE_ACTION_DONE) {
@ -3903,17 +3909,33 @@ gst_validate_scenario_reset (GstValidateScenario * scenario)
SCENARIO_UNLOCK (scenario); SCENARIO_UNLOCK (scenario);
} }
typedef struct
{
GstValidateScenario *scenario;
GstMessage *message;
} MessageData;
static void
message_data_free (MessageData * d)
{
gst_message_unref (d->message);
gst_object_unref (d->scenario);
g_free (d);
}
static gboolean static gboolean
message_cb (GstBus * bus, GstMessage * message, GstValidateScenario * scenario) handle_bus_message (MessageData * d)
{ {
gboolean is_error = FALSE; gboolean is_error = FALSE;
GstMessage *message = d->message;
GstValidateScenario *scenario = d->scenario;
GstValidateScenarioPrivate *priv = scenario->priv; GstValidateScenarioPrivate *priv = scenario->priv;
GstElement *pipeline = gst_validate_scenario_get_pipeline (scenario); GstElement *pipeline = gst_validate_scenario_get_pipeline (scenario);
if (!pipeline) { if (!pipeline) {
GST_ERROR_OBJECT (scenario, "No pipeline set anymore!"); GST_ERROR_OBJECT (scenario, "No pipeline set anymore!");
return G_SOURCE_REMOVE;
return FALSE;
} }
GST_DEBUG_OBJECT (scenario, "message %" GST_PTR_FORMAT, message); GST_DEBUG_OBJECT (scenario, "message %" GST_PTR_FORMAT, message);
@ -3931,7 +3953,7 @@ message_cb (GstBus * bus, GstMessage * message, GstValidateScenario * scenario)
if (priv->needs_playback_parsing) { if (priv->needs_playback_parsing) {
priv->needs_playback_parsing = FALSE; priv->needs_playback_parsing = FALSE;
if (!gst_validate_parse_next_action_playback_time (scenario)) if (!gst_validate_parse_next_action_playback_time (scenario))
return FALSE; return G_SOURCE_REMOVE;
} }
_add_execute_actions_gsource (scenario); _add_execute_actions_gsource (scenario);
break; break;
@ -4164,7 +4186,20 @@ done:
execute_next_action_full (scenario, message); execute_next_action_full (scenario, message);
return TRUE; return G_SOURCE_REMOVE;
}
static void
message_cb (GstBus * bus, GstMessage * message, GstValidateScenario * scenario)
{
MessageData *d = g_new0 (MessageData, 1);
d->message = gst_message_ref (message);
d->scenario = gst_object_ref (scenario);
g_main_context_invoke_full (scenario->priv->context,
G_PRIORITY_DEFAULT_IDLE,
(GSourceFunc) handle_bus_message, d, (GDestroyNotify) message_data_free);
} }
static gboolean static gboolean
@ -4573,6 +4608,11 @@ gst_validate_scenario_init (GstValidateScenario * scenario)
priv->clock = NULL; priv->clock = NULL;
g_mutex_init (&priv->lock); g_mutex_init (&priv->lock);
scenario->priv->context = g_main_context_get_thread_default ();
if (!scenario->priv->context)
scenario->priv->context = g_main_context_default ();
g_main_context_ref (scenario->priv->context);
} }
static void static void
@ -4601,8 +4641,11 @@ gst_validate_scenario_finalize (GObject * object)
/* Because g_object_add_weak_pointer() is used, this MUST be on the /* Because g_object_add_weak_pointer() is used, this MUST be on the
* main thread. */ * main thread. */
g_assert (g_main_context_acquire (g_main_context_default ())); g_assert (g_main_context_acquire (priv->context));
g_main_context_release (g_main_context_default ()); g_main_context_release (priv->context);
g_main_context_unref (priv->context);
priv->context = NULL;
g_list_free_full (priv->seeks, g_list_free_full (priv->seeks,
(GDestroyNotify) gst_validate_seek_information_free); (GDestroyNotify) gst_validate_seek_information_free);
@ -5830,7 +5873,9 @@ _action_set_done (GstValidateAction * action)
void void
gst_validate_action_set_done (GstValidateAction * action) gst_validate_action_set_done (GstValidateAction * action)
{ {
GMainContext *context = action->priv->context;
action->priv->context = NULL;
if (action->priv->state == GST_VALIDATE_EXECUTE_ACTION_NON_BLOCKING) { if (action->priv->state == GST_VALIDATE_EXECUTE_ACTION_NON_BLOCKING) {
GstValidateScenario *scenario = gst_validate_action_get_scenario (action); GstValidateScenario *scenario = gst_validate_action_get_scenario (action);
GList *item = NULL; GList *item = NULL;
@ -5852,10 +5897,14 @@ gst_validate_action_set_done (GstValidateAction * action)
g_assert (!action->priv->pending_set_done); g_assert (!action->priv->pending_set_done);
action->priv->pending_set_done = TRUE; action->priv->pending_set_done = TRUE;
g_main_context_invoke_full (NULL, G_PRIORITY_DEFAULT_IDLE, g_main_context_invoke_full (action->priv->context,
G_PRIORITY_DEFAULT_IDLE,
(GSourceFunc) _action_set_done, (GSourceFunc) _action_set_done,
gst_validate_action_ref (action), gst_validate_action_ref (action),
(GDestroyNotify) gst_validate_action_unref); (GDestroyNotify) gst_validate_action_unref);
if (context)
g_main_context_unref (context);
} }
/** /**