From b8a7b2c9e2b44f38f04fc8de5cc5314b9ea24546 Mon Sep 17 00:00:00 2001 From: Thibault Saunier Date: Mon, 6 Nov 2023 09:47:04 +0000 Subject: [PATCH] ges: discoverer-manager: Support for delayed clean-up of discoverers The manager keeps track of one discoverer per thread and in large applications with hundreds of threads this can significantly increase memory pressure. So we need to periodically clean-up the unused discoverers. Part-of: --- .../ges/ges-discoverer-manager.c | 145 +++++++++++++++--- .../gst-editing-services/ges/ges-internal.h | 2 + .../gst-editing-services/ges/ges-utils.c | 14 ++ 3 files changed, 139 insertions(+), 22 deletions(-) diff --git a/subprojects/gst-editing-services/ges/ges-discoverer-manager.c b/subprojects/gst-editing-services/ges/ges-discoverer-manager.c index 804de14366..fd8baf619a 100644 --- a/subprojects/gst-editing-services/ges/ges-discoverer-manager.c +++ b/subprojects/gst-editing-services/ges/ges-discoverer-manager.c @@ -1,12 +1,44 @@ #include "ges-internal.h" #include "ges-discoverer-manager.h" + +typedef struct +{ + GWeakRef /* */ manager; + GstDiscoverer *discoverer; + GThread *thread; + gint n_uri; + gulong load_serialized_info_id; + gulong source_setup_id; + gulong discovered_id; +} GESDiscovererData; + +static void +ges_discoverer_data_free (GESDiscovererData * data) +{ + GST_LOG ("Freeing discoverer %" GST_PTR_FORMAT, data->discoverer); + g_assert (data->n_uri == 0); + gst_discoverer_stop (data->discoverer); + g_signal_handler_disconnect (data->discoverer, data->load_serialized_info_id); + g_signal_handler_disconnect (data->discoverer, data->source_setup_id); + g_signal_handler_disconnect (data->discoverer, data->discovered_id); + g_weak_ref_clear (&data->manager); + + g_object_unref (data->discoverer); +} + +static void +ges_discoverer_data_unref (GESDiscovererData * data) +{ + g_atomic_rc_box_release_full (data, + (GDestroyNotify) ges_discoverer_data_free); +} + /** * GESDiscovererManager: * * Since: 1.24 */ - struct _GESDiscovererManager { GObject parent; @@ -86,8 +118,28 @@ static void ges_discoverer_manager_finalize (GObject * object) { GESDiscovererManager *self = GES_DISCOVERER_MANAGER (object); + GHashTableIter iter; + GESDiscovererData *discoverer_data; + GMainContext *context = g_main_context_get_thread_default (); + + if (!context) + context = g_main_context_default (); + + + g_mutex_lock (&self->lock); + g_hash_table_iter_init (&iter, self->discoverers); + while (g_hash_table_iter_next (&iter, NULL, (gpointer *) & discoverer_data)) { + GSource *source; + + while ((source = + g_main_context_find_source_by_user_data (context, + discoverer_data))) { + g_source_destroy (source); + } + } g_hash_table_unref (self->discoverers); + g_mutex_unlock (&self->lock); G_OBJECT_CLASS (ges_discoverer_manager_parent_class)->finalize (object); } @@ -187,7 +239,7 @@ void ges_discoverer_manager_init (GESDiscovererManager * self) { self->discoverers = g_hash_table_new_full (g_direct_hash, g_str_equal, - NULL, g_object_unref); + NULL, (GDestroyNotify) ges_discoverer_data_unref); } @@ -276,7 +328,7 @@ ges_discoverer_manager_set_timeout (GESDiscovererManager * self, GstClockTime timeout) { GHashTableIter iter; - GstDiscoverer *discoverer; + GESDiscovererData *discoverer_data; g_return_if_fail (GES_IS_DISCOVERER_MANAGER (self)); @@ -284,8 +336,8 @@ ges_discoverer_manager_set_timeout (GESDiscovererManager * self, g_mutex_lock (&self->lock); g_hash_table_iter_init (&iter, self->discoverers); - while (g_hash_table_iter_next (&iter, NULL, (gpointer *) & discoverer)) - g_object_set (discoverer, "timeout", timeout, NULL); + while (g_hash_table_iter_next (&iter, NULL, (gpointer *) & discoverer_data)) + g_object_set (discoverer_data->discoverer, "timeout", timeout, NULL); g_mutex_unlock (&self->lock); } @@ -305,37 +357,86 @@ source_setup_cb (GESDiscovererManager * self, GstElement * source) g_signal_emit (self, signals[DISCOVERER_SOURCE_SETUP], 0, source); } -static void -proxy_discovered_cb (GESDiscovererManager * self, - GstDiscovererInfo * info, GError * err, gpointer user_data) +static gboolean +cleanup_discoverer_cb (GESDiscovererData * discoverer_data) { - g_signal_emit (self, signals[DISCOVERER_SIGNAL], 0, info, err); + GESDiscovererManager *self = g_weak_ref_get (&discoverer_data->manager); + gint res = G_SOURCE_CONTINUE; + + if (!self) { + return G_SOURCE_REMOVE; + } + + if (discoverer_data->n_uri > 0) { + GST_DEBUG_OBJECT (self, "Discoverer still has %d uris to discover", + discoverer_data->n_uri); + goto done; + } + + g_mutex_lock (&self->lock); + res = G_SOURCE_REMOVE; + GST_DEBUG_OBJECT (self, "Removing unused discoverer"); + g_hash_table_remove (self->discoverers, discoverer_data->thread); + +done: + g_mutex_unlock (&self->lock); + g_object_unref (self); + + return res; } +static void +proxy_discovered_cb (GESDiscovererManager * self, + GstDiscovererInfo * info, GError * err, GstDiscoverer * discoverer) +{ + g_signal_emit (self, signals[DISCOVERER_SIGNAL], 0, info, err); -static GstDiscoverer * + g_mutex_lock (&self->lock); + GESDiscovererData *data = + g_hash_table_lookup (self->discoverers, g_thread_self ()); + if (data) { + data->n_uri--; + data = g_atomic_rc_box_acquire (data); + } + g_mutex_unlock (&self->lock); + + if (data) { + ges_timeout_add (1000, (GSourceFunc) cleanup_discoverer_cb, data, + (GDestroyNotify) ges_discoverer_data_unref); + } +} + +static GESDiscovererData * create_discoverer (GESDiscovererManager * self) { GstDiscoverer *discoverer; + GESDiscovererData *data = g_atomic_rc_box_new0 (GESDiscovererData); discoverer = gst_discoverer_new (self->timeout, NULL); - g_signal_connect_swapped (discoverer, "load-serialized-info", + data->thread = g_thread_self (); + g_weak_ref_set (&data->manager, self); + data->load_serialized_info_id = + g_signal_connect_swapped (discoverer, "load-serialized-info", G_CALLBACK (proxy_load_serialized_info_cb), self); - g_signal_connect_swapped (discoverer, "source-setup", + data->source_setup_id = + g_signal_connect_swapped (discoverer, "source-setup", G_CALLBACK (source_setup_cb), self); - g_signal_connect_swapped (discoverer, "discovered", + data->discovered_id = + g_signal_connect_swapped (discoverer, "discovered", G_CALLBACK (proxy_discovered_cb), self); g_object_set (discoverer, "use-cache", self->use_cache, NULL); gst_discoverer_start (discoverer); - return discoverer; + data->discoverer = discoverer; + + return data; } -static GstDiscoverer * +static GESDiscovererData * ges_discoverer_manager_get_discoverer (GESDiscovererManager * self) { - GstDiscoverer *ret; + GESDiscovererData *ret; g_return_val_if_fail (GES_IS_DISCOVERER_MANAGER (self), NULL); @@ -355,19 +456,19 @@ gboolean ges_discoverer_manager_start_discovery (GESDiscovererManager * self, const gchar * uri) { - GstDiscoverer *discoverer; + GESDiscovererData *disco_data; g_return_val_if_fail (uri != NULL, FALSE); - discoverer = ges_discoverer_manager_get_discoverer (self); - - gboolean res = gst_discoverer_discover_uri_async (discoverer, uri); + disco_data = ges_discoverer_manager_get_discoverer (self); g_mutex_lock (&self->lock); - g_hash_table_insert (self->discoverers, g_thread_self (), discoverer); + gboolean res = + gst_discoverer_discover_uri_async (disco_data->discoverer, uri); + disco_data->n_uri++; + g_hash_table_insert (self->discoverers, g_thread_self (), disco_data); g_mutex_unlock (&self->lock); - return res; } diff --git a/subprojects/gst-editing-services/ges/ges-internal.h b/subprojects/gst-editing-services/ges/ges-internal.h index fe3759da6e..785b2f8ddb 100644 --- a/subprojects/gst-editing-services/ges/ges-internal.h +++ b/subprojects/gst-editing-services/ges/ges-internal.h @@ -423,6 +423,8 @@ ges_get_compositor_factory (void); G_GNUC_INTERNAL void ges_idle_add (GSourceFunc func, gpointer udata, GDestroyNotify notify); +G_GNUC_INTERNAL void +ges_timeout_add (guint interval, GSourceFunc func, gpointer udata, GDestroyNotify notify); G_GNUC_INTERNAL gboolean ges_util_structure_get_clocktime (GstStructure *structure, const gchar *name, diff --git a/subprojects/gst-editing-services/ges/ges-utils.c b/subprojects/gst-editing-services/ges/ges-utils.c index 43c48bd050..efc0c000e7 100644 --- a/subprojects/gst-editing-services/ges/ges-utils.c +++ b/subprojects/gst-editing-services/ges/ges-utils.c @@ -280,6 +280,20 @@ ges_get_compositor_factory (void) return compositor_factory; } +void +ges_timeout_add (guint interval, GSourceFunc func, gpointer udata, + GDestroyNotify notify) +{ + GMainContext *context = g_main_context_get_thread_default (); + GSource *source = g_timeout_source_new (interval); + + if (!context) + context = g_main_context_default (); + + g_source_set_callback (source, func, udata, notify); + g_source_attach (source, context); +} + void ges_idle_add (GSourceFunc func, gpointer udata, GDestroyNotify notify) {