From 16b68e7de01a3f20cc760ee7f0e5aeb3e171120e Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Thu, 26 Mar 2009 18:46:35 +0100 Subject: [PATCH] clock: rework the wakeup of entries. Keep a counter for the amount of outstanding wakeups that we produce and only perform a write/read to the control socket when 1 or 0 respectively. don't poll when waiting for the entries to be unblocked and clear their wakeup counts, just act on the signal when the wakeup count is 0. unscheduled entries will clear their wakeup count themselves. Keep track of when we wakeup the async thread because the list of entries has changed. don't try to see if the list changed because we can't really know when one entry is added multiple times. Only wake up the async thread when we add an async entry to the head of the list and the old entry was BUSY. --- gst/gstsystemclock.c | 125 +++++++++++++++++++++++++------------------ 1 file changed, 73 insertions(+), 52 deletions(-) diff --git a/gst/gstsystemclock.c b/gst/gstsystemclock.c index b65c0d877b..84be9a911f 100644 --- a/gst/gstsystemclock.c +++ b/gst/gstsystemclock.c @@ -54,7 +54,8 @@ struct _GstSystemClockPrivate { GstClockType clock_type; GstPoll *timer; - gint async_wakeup_count; + gint wakeup_count; /* the number of entries with a pending wakeup */ + gboolean async_wakeup; /* if the wakeup was because of a async list change */ }; #define GST_SYSTEM_CLOCK_GET_PRIVATE(obj) \ @@ -306,29 +307,46 @@ gst_system_clock_obtain (void) } static void -gst_system_clock_clear_async_wakeups_unlocked (GstSystemClock * sysclock) +gst_system_clock_remove_wakeup (GstSystemClock * sysclock) { - while (sysclock->priv->async_wakeup_count > 0) { + g_return_if_fail (sysclock->priv->wakeup_count > 0); + + sysclock->priv->wakeup_count--; + if (sysclock->priv->wakeup_count == 0) { + /* read the control socket byte when we removed the last wakeup count */ GST_CAT_DEBUG (GST_CAT_CLOCK, "reading control"); while (!gst_poll_read_control (sysclock->priv->timer)) { g_warning ("gstsystemclock: read control failed, trying again\n"); } - sysclock->priv->async_wakeup_count--; + GST_CLOCK_BROADCAST (sysclock); } - GST_CLOCK_BROADCAST (sysclock); + GST_CAT_DEBUG (GST_CAT_CLOCK, "wakeup count %d", + sysclock->priv->wakeup_count); } static void -gst_system_clock_wakeup_async_unlocked (GstSystemClock * sysclock) +gst_system_clock_add_wakeup (GstSystemClock * sysclock) { - GST_CAT_DEBUG (GST_CAT_CLOCK, "writing control"); - - while (!gst_poll_write_control (sysclock->priv->timer)) { - g_warning - ("gstsystemclock: write control failed in wakeup_async, trying again : %d:%s\n", - errno, g_strerror (errno)); + /* only write the control socket for the first wakeup */ + if (sysclock->priv->wakeup_count == 0) { + GST_CAT_DEBUG (GST_CAT_CLOCK, "writing control"); + while (!gst_poll_write_control (sysclock->priv->timer)) { + g_warning + ("gstsystemclock: write control failed in wakeup_async, trying again : %d:%s\n", + errno, g_strerror (errno)); + } + } + sysclock->priv->wakeup_count++; + GST_CAT_DEBUG (GST_CAT_CLOCK, "wakeup count %d", + sysclock->priv->wakeup_count); +} + +static void +gst_system_clock_wait_wakeup (GstSystemClock * sysclock) +{ + while (sysclock->priv->wakeup_count > 0) { + GST_CLOCK_WAIT (sysclock); } - sysclock->priv->async_wakeup_count++; } /* this thread reads the sorted clock entries from the queue. @@ -374,12 +392,19 @@ gst_system_clock_async_thread (GstClock * clock) /* if it was unscheduled, just move on to the next entry */ if (entry->status == GST_CLOCK_UNSCHEDULED) { GST_CAT_DEBUG (GST_CAT_CLOCK, "entry %p was unscheduled", entry); - gst_system_clock_clear_async_wakeups_unlocked (sysclock); goto next_entry; } requested = entry->time; + /* see if we have a pending wakeup because the order of the list + * changed. */ + if (sysclock->priv->async_wakeup) { + GST_CAT_DEBUG (GST_CAT_CLOCK, "clear async wakeup", entry); + gst_system_clock_remove_wakeup (sysclock); + sysclock->priv->async_wakeup = FALSE; + } + /* now wait for the entry, we already hold the lock */ res = gst_system_clock_id_wait_jitter_unlocked (clock, (GstClockID) entry, @@ -395,7 +420,7 @@ gst_system_clock_async_thread (GstClock * clock) { /* entry timed out normally, fire the callback and move to the next * entry */ - GST_CAT_DEBUG (GST_CAT_CLOCK, "async entry %p unlocked", entry); + GST_CAT_DEBUG (GST_CAT_CLOCK, "async entry %p timed out", entry); if (entry->func) { /* unlock before firing the callback */ GST_OBJECT_UNLOCK (clock); @@ -403,10 +428,6 @@ gst_system_clock_async_thread (GstClock * clock) entry->user_data); GST_OBJECT_LOCK (clock); } - if (clock->entries->data != entry) { - /* new entries have been added, clear async wakeups */ - gst_system_clock_clear_async_wakeups_unlocked (sysclock); - } if (entry->type == GST_CLOCK_ENTRY_PERIODIC) { GST_CAT_DEBUG (GST_CAT_CLOCK, "updating periodic entry %p", entry); /* adjust time now */ @@ -417,6 +438,7 @@ gst_system_clock_async_thread (GstClock * clock) /* and restart */ continue; } else { + GST_CAT_DEBUG (GST_CAT_CLOCK, "moving to next entry"); goto next_entry; } } @@ -426,15 +448,11 @@ gst_system_clock_async_thread (GstClock * clock) * was canceled. Whatever it is, pick the head entry of the list and * continue waiting. */ GST_CAT_DEBUG (GST_CAT_CLOCK, "async entry %p needs restart", entry); - /* clear async wakeups, if any */ - gst_system_clock_clear_async_wakeups_unlocked (sysclock); - if (clock->entries->data != entry) { - /* if the new head is not this entry, we set the entry back to the OK - * state. This is needed so that the _unschedule() code can see if an - * entry is currently being waited on (when its state is BUSY). */ - entry->status = GST_CLOCK_OK; - } + /* we set the entry back to the OK state. This is needed so that the + * _unschedule() code can see if an entry is currently being waited + * on (when its state is BUSY). */ + entry->status = GST_CLOCK_OK; continue; default: GST_CAT_DEBUG (GST_CAT_CLOCK, @@ -576,12 +594,8 @@ gst_system_clock_id_wait_jitter_unlocked (GstClock * clock, /* another thread can read the fd before we get the lock */ GST_OBJECT_LOCK (clock); if (entry->status == GST_CLOCK_UNSCHEDULED) { - GST_CAT_DEBUG (GST_CAT_CLOCK, "entry %p unlocked", entry); - while (!gst_poll_read_control (sysclock->priv->timer)) { - g_warning ("gstsystemclock: read control failed, trying again\n"); - } - GST_CLOCK_BROADCAST (clock); + gst_system_clock_remove_wakeup (sysclock); } else { if (pollret != 0) { /* some other id got unlocked */ @@ -594,14 +608,12 @@ gst_system_clock_id_wait_jitter_unlocked (GstClock * clock, /* mark ourselves as EARLY, we release the lock and we could be * unscheduled ourselves but we don't want the unscheduling thread - * to write on the fd */ + * to write on the control socket (it does that when an entry has a + * BUSY status). */ entry->status = GST_CLOCK_EARLY; - /* before waiting on the cond, check if another thread read the fd - * before we got the lock */ - while (gst_poll_wait (sysclock->priv->timer, 0) > 0) { - GST_CLOCK_WAIT (clock); - } + /* wait till all the entries got woken up */ + gst_system_clock_wait_wakeup (sysclock); /* we released the lock in the wait, recheck our status */ if (entry->status == GST_CLOCK_UNSCHEDULED) { @@ -705,8 +717,8 @@ no_thread: static GstClockReturn gst_system_clock_id_wait_async (GstClock * clock, GstClockEntry * entry) { - gboolean empty; GstSystemClock *sysclock; + GstClockEntry *head; sysclock = GST_SYSTEM_CLOCK_CAST (clock); @@ -718,7 +730,10 @@ gst_system_clock_id_wait_async (GstClock * clock, GstClockEntry * entry) if (!gst_system_clock_start_async (sysclock)) goto thread_error; - empty = (clock->entries == NULL); + if (clock->entries) + head = clock->entries->data; + else + head = NULL; /* need to take a ref */ gst_clock_id_ref ((GstClockID) entry); @@ -731,15 +746,21 @@ gst_system_clock_id_wait_async (GstClock * clock, GstClockEntry * entry) * will get to this entry automatically. */ if (clock->entries->data == entry) { GST_CAT_DEBUG (GST_CAT_CLOCK, "async entry added to head"); - if (empty) { + if (head == NULL) { /* the list was empty before, signal the cond so that the async thread can * start taking a look at the queue */ - GST_CAT_DEBUG (GST_CAT_CLOCK, "sending signal"); + GST_CAT_DEBUG (GST_CAT_CLOCK, "first entry, sending signal"); GST_CLOCK_BROADCAST (clock); } else { - /* the async thread was waiting for an entry, unlock the wait so that it - * looks at the new head entry instead */ - gst_system_clock_wakeup_async_unlocked (sysclock); + if (head->status == GST_CLOCK_BUSY) { + /* the async thread was waiting for an entry, unlock the wait so that it + * looks at the new head entry instead, we only need to do this once */ + if (!sysclock->priv->async_wakeup) { + GST_CAT_DEBUG (GST_CAT_CLOCK, "wakeup async thread"); + sysclock->priv->async_wakeup = TRUE; + gst_system_clock_add_wakeup (sysclock); + } + } } } GST_OBJECT_UNLOCK (clock); @@ -761,20 +782,20 @@ thread_error: static void gst_system_clock_id_unschedule (GstClock * clock, GstClockEntry * entry) { + GstSystemClock *sysclock; + + sysclock = GST_SYSTEM_CLOCK_CAST (clock); + GST_CAT_DEBUG (GST_CAT_CLOCK, "unscheduling entry %p", entry); GST_OBJECT_LOCK (clock); if (entry->status == GST_CLOCK_BUSY) { /* the entry was being busy, wake up all entries so that they recheck their * status. We cannot wake up just one entry because allocating such a - * datastructure for each entry would be too heavey and unlocking an entry + * datastructure for each entry would be too heavy and unlocking an entry * is usually done when shutting down or some other exceptional case. */ - GST_CAT_DEBUG (GST_CAT_CLOCK, "writing control"); - while (!gst_poll_write_control (GST_SYSTEM_CLOCK_CAST (clock)->priv->timer)) { - g_warning - ("gstsystemclock: write control failed in unschedule, trying again\n : %d:%s\n", - errno, g_strerror (errno)); - } + GST_CAT_DEBUG (GST_CAT_CLOCK, "entry was BUSY, doing wakeup"); + gst_system_clock_add_wakeup (sysclock); } /* when it leaves the poll, it'll detect the unscheduled */ entry->status = GST_CLOCK_UNSCHEDULED;