uri-asset: Implement multi threading support

Making sure to have 1 GstDiscoverer per thread.

Use that new feature in gesdemux by loading the timeline directly from
the streaming thread. Modifying the timeline is not supported allowed
anyway.
This commit is contained in:
Thibault Saunier 2019-06-16 21:27:47 -04:00
parent 3d11893fd7
commit 61c952c714
3 changed files with 80 additions and 51 deletions

View file

@ -42,7 +42,43 @@
static GHashTable *parent_newparent_table = NULL;
static GstDiscoverer *discoverer = NULL;
G_LOCK_DEFINE_STATIC (discoverers_lock);
static GstClockTime discovering_timeout = DEFAULT_DISCOVERY_TIMEOUT;
static GHashTable *discoverers = NULL; /* Thread ID -> GstDiscoverer */
static void discoverer_discovered_cb (GstDiscoverer * discoverer,
GstDiscovererInfo * info, GError * err, gpointer user_data);
/* WITH discoverers_lock */
static GstDiscoverer *
create_discoverer ()
{
GstDiscoverer *disco = gst_discoverer_new (discovering_timeout, NULL);
g_signal_connect (disco, "discovered", G_CALLBACK (discoverer_discovered_cb),
NULL);
GST_INFO_OBJECT (disco, "Creating new discoverer");
g_hash_table_insert (discoverers, g_thread_self (), disco);
gst_discoverer_start (disco);
return disco;
}
static GstDiscoverer *
get_discoverer ()
{
GstDiscoverer *disco;
G_LOCK (discoverers_lock);
g_assert (discoverers);
disco = g_hash_table_lookup (discoverers, g_thread_self ());
if (!disco) {
disco = create_discoverer ();
}
disco = gst_object_ref (disco);
G_UNLOCK (discoverers_lock);
return disco;
}
static void
initable_iface_init (GInitableIface * initable_iface)
@ -62,9 +98,6 @@ enum
};
static GParamSpec *properties[PROP_LAST];
static void discoverer_discovered_cb (GstDiscoverer * discoverer,
GstDiscovererInfo * info, GError * err, gpointer user_data);
struct _GESUriClipAssetPrivate
{
GstDiscovererInfo *info;
@ -128,12 +161,14 @@ _start_loading (GESAsset * asset, GError ** error)
{
gboolean ret;
const gchar *uri;
GESUriClipAssetClass *class = GES_URI_CLIP_ASSET_GET_CLASS (asset);
GstDiscoverer *discoverer = get_discoverer ();
uri = ges_asset_get_id (asset);
GST_DEBUG_OBJECT (discoverer, "Started loading %s", uri);
ret = gst_discoverer_discover_uri_async (discoverer, uri);
gst_object_unref (discoverer);
GST_DEBUG_OBJECT (asset, "Started loading %s", uri);
ret = gst_discoverer_discover_uri_async (class->discoverer, uri);
if (ret)
return GES_ASSET_LOADING_ASYNC;
@ -568,9 +603,7 @@ ges_uri_clip_asset_request_sync (const gchar * uri, GError ** error)
GError *lerror = NULL;
GESUriClipAsset *asset;
RequestSyncData data = { 0, };
GESUriClipAssetClass *klass = g_type_class_peek (GES_TYPE_URI_CLIP_ASSET);
GstClockTime timeout;
GstDiscoverer *previous_discoverer = klass->discoverer;
GstDiscoverer *previous_discoverer;
asset = GES_URI_CLIP_ASSET (ges_asset_request (GES_TYPE_URI_CLIP, uri,
&lerror));
@ -579,25 +612,17 @@ ges_uri_clip_asset_request_sync (const gchar * uri, GError ** error)
return asset;
data.ml = g_main_loop_new (NULL, TRUE);
g_object_get (previous_discoverer, "timeout", &timeout, NULL);
klass->discoverer = gst_discoverer_new (timeout, error);
g_object_set (klass->discoverer, "use-cache", TRUE, NULL);
if (!klass->discoverer) {
klass->discoverer = previous_discoverer;
previous_discoverer = get_discoverer ();
create_discoverer ();
return NULL;
}
g_signal_connect (klass->discoverer, "discovered",
G_CALLBACK (klass->discovered), NULL);
gst_discoverer_start (klass->discoverer);
ges_asset_request_async (GES_TYPE_URI_CLIP, uri, NULL,
(GAsyncReadyCallback) asset_ready_cb, &data);
g_main_loop_run (data.ml);
g_main_loop_unref (data.ml);
gst_object_unref (klass->discoverer);
klass->discoverer = previous_discoverer;
G_LOCK (discoverers_lock);
g_hash_table_insert (discoverers, g_thread_self (), previous_discoverer);
G_UNLOCK (discoverers_lock);
if (data.error) {
GST_ERROR ("Got an error requesting asset: %s", data.error->message);
@ -621,9 +646,18 @@ void
ges_uri_clip_asset_class_set_timeout (GESUriClipAssetClass * klass,
GstClockTime timeout)
{
GHashTableIter iter;
gpointer value;
g_return_if_fail (GES_IS_URI_CLIP_ASSET_CLASS (klass));
g_object_set (klass->discoverer, "timeout", timeout, NULL);
discovering_timeout = timeout;
G_LOCK (discoverers_lock);
g_hash_table_iter_init (&iter, discoverers);
while (g_hash_table_iter_next (&iter, NULL, &value))
g_object_set (value, "timeout", timeout, NULL);
G_UNLOCK (discoverers_lock);
}
/**
@ -774,13 +808,17 @@ ges_uri_source_asset_get_filesource_asset (GESUriSourceAsset * asset)
void
_ges_uri_asset_cleanup (void)
{
if (discoverer)
gst_discoverer_stop (discoverer);
g_clear_object (&discoverer);
if (parent_newparent_table) {
g_hash_table_destroy (parent_newparent_table);
parent_newparent_table = NULL;
}
G_LOCK (discoverers_lock);
if (discoverers) {
g_hash_table_destroy (discoverers);
discoverers = NULL;
}
G_UNLOCK (discoverers_lock);
}
gboolean
@ -790,6 +828,7 @@ _ges_uri_asset_ensure_setup (gpointer uriasset_class)
GError *err;
GstClockTime timeout;
const gchar *timeout_str;
GstDiscoverer *discoverer = NULL;
g_return_val_if_fail (GES_IS_URI_CLIP_ASSET_CLASS (uriasset_class), FALSE);
@ -826,10 +865,17 @@ _ges_uri_asset_ensure_setup (gpointer uriasset_class)
g_signal_connect (klass->discoverer, "discovered",
G_CALLBACK (klass->discovered), NULL);
gst_discoverer_start (klass->discoverer);
}
G_LOCK (discoverers_lock);
if (discoverers == NULL) {
discoverers = g_hash_table_new_full (g_direct_hash,
(GEqualFunc) g_direct_equal, NULL, g_object_unref);
}
G_UNLOCK (discoverers_lock);
/* We just start the discoverer and let it live */
gst_discoverer_start (klass->discoverer);
if (parent_newparent_table == NULL) {
parent_newparent_table = g_hash_table_new_full (g_file_hash,
(GEqualFunc) g_file_equal, g_object_unref, g_object_unref);

View file

@ -62,11 +62,13 @@ struct _GESUriClipAssetClass
GESClipAssetClass parent_class;
/* <private> */
GstDiscoverer *discoverer;
GstDiscoverer *sync_discoverer;
GstDiscoverer *discoverer; /* Unused */
GstDiscoverer *sync_discoverer; /* Unused */
void (*discovered) (GstDiscoverer * discoverer, GstDiscovererInfo * info,
GError * err, gpointer user_data);
void (*discovered) (GstDiscoverer * discoverer, /* Unused */
GstDiscovererInfo * info,
GError * err,
gpointer user_data);
gpointer _ges_reserved[GES_PADDING -1];
};

View file

@ -171,26 +171,16 @@ error_loading_asset_cb (GESProject * project, GError * error, gchar * id,
g_main_loop_quit (data->ml);
}
/* TODO: Add a way to run a function in the right GES thread */
static gboolean
ges_timeline_new_from_uri_from_main_thread (TimelineConstructionData * data)
{
GESProject *project = ges_project_new (data->uri);
GESUriClipAssetClass *klass = g_type_class_peek (GES_TYPE_URI_CLIP_ASSET);
GstDiscoverer *previous_discoverer = klass->discoverer;
GstClockTime timeout;
G_GNUC_UNUSED void *unused;
g_object_get (previous_discoverer, "timeout", &timeout, NULL);
/* Make sure to use a new discoverer in case we are being discovered,
* as discovering is done one by one, and the global discoverer won't
* have the chance to discover the project assets */
g_mutex_lock (&data->lock);
klass->discoverer = gst_discoverer_new (timeout, &data->error);
g_object_set (klass->discoverer, "use-cache", TRUE, NULL);
if (data->error) {
klass->discoverer = previous_discoverer;
g_mutex_unlock (&data->lock);
goto done;
@ -222,15 +212,6 @@ done:
g_mutex_lock (&data->lock);
/* Set previous discoverer back! */
if (klass->discoverer)
gst_object_unref (klass->discoverer);
klass->discoverer = previous_discoverer;
if (data->timeline)
ges_timeline_commit (data->timeline);
if (data->loaded_sigid)
g_signal_handler_disconnect (project, data->loaded_sigid);