clock: rework the wakeup of entries.

Keep a counter for the amount of outstanding wakeups that we produce and only
perform a write/read to the control socket when 1 or 0 respectively.
don't poll when waiting for the entries to be unblocked and clear their wakeup
counts, just act on the signal when the wakeup count is 0.
unscheduled entries will clear their wakeup count themselves.
Keep track of when we wakeup the async thread because the list of entries has
changed.
don't try to see if the list changed because we can't really know when one entry
is added multiple times.
Only wake up the async thread when we add an async entry to the head of the list
and the old entry was BUSY.
This commit is contained in:
Wim Taymans 2009-03-26 18:46:35 +01:00
parent 31669a4819
commit 16b68e7de0

View file

@ -54,7 +54,8 @@ struct _GstSystemClockPrivate
{ {
GstClockType clock_type; GstClockType clock_type;
GstPoll *timer; GstPoll *timer;
gint async_wakeup_count; gint wakeup_count; /* the number of entries with a pending wakeup */
gboolean async_wakeup; /* if the wakeup was because of a async list change */
}; };
#define GST_SYSTEM_CLOCK_GET_PRIVATE(obj) \ #define GST_SYSTEM_CLOCK_GET_PRIVATE(obj) \
@ -306,29 +307,46 @@ gst_system_clock_obtain (void)
} }
static void static void
gst_system_clock_clear_async_wakeups_unlocked (GstSystemClock * sysclock) gst_system_clock_remove_wakeup (GstSystemClock * sysclock)
{ {
while (sysclock->priv->async_wakeup_count > 0) { g_return_if_fail (sysclock->priv->wakeup_count > 0);
sysclock->priv->wakeup_count--;
if (sysclock->priv->wakeup_count == 0) {
/* read the control socket byte when we removed the last wakeup count */
GST_CAT_DEBUG (GST_CAT_CLOCK, "reading control"); GST_CAT_DEBUG (GST_CAT_CLOCK, "reading control");
while (!gst_poll_read_control (sysclock->priv->timer)) { while (!gst_poll_read_control (sysclock->priv->timer)) {
g_warning ("gstsystemclock: read control failed, trying again\n"); g_warning ("gstsystemclock: read control failed, trying again\n");
} }
sysclock->priv->async_wakeup_count--;
}
GST_CLOCK_BROADCAST (sysclock); GST_CLOCK_BROADCAST (sysclock);
} }
GST_CAT_DEBUG (GST_CAT_CLOCK, "wakeup count %d",
sysclock->priv->wakeup_count);
}
static void static void
gst_system_clock_wakeup_async_unlocked (GstSystemClock * sysclock) gst_system_clock_add_wakeup (GstSystemClock * sysclock)
{ {
/* only write the control socket for the first wakeup */
if (sysclock->priv->wakeup_count == 0) {
GST_CAT_DEBUG (GST_CAT_CLOCK, "writing control"); GST_CAT_DEBUG (GST_CAT_CLOCK, "writing control");
while (!gst_poll_write_control (sysclock->priv->timer)) { while (!gst_poll_write_control (sysclock->priv->timer)) {
g_warning g_warning
("gstsystemclock: write control failed in wakeup_async, trying again : %d:%s\n", ("gstsystemclock: write control failed in wakeup_async, trying again : %d:%s\n",
errno, g_strerror (errno)); errno, g_strerror (errno));
} }
sysclock->priv->async_wakeup_count++; }
sysclock->priv->wakeup_count++;
GST_CAT_DEBUG (GST_CAT_CLOCK, "wakeup count %d",
sysclock->priv->wakeup_count);
}
static void
gst_system_clock_wait_wakeup (GstSystemClock * sysclock)
{
while (sysclock->priv->wakeup_count > 0) {
GST_CLOCK_WAIT (sysclock);
}
} }
/* this thread reads the sorted clock entries from the queue. /* this thread reads the sorted clock entries from the queue.
@ -374,12 +392,19 @@ gst_system_clock_async_thread (GstClock * clock)
/* if it was unscheduled, just move on to the next entry */ /* if it was unscheduled, just move on to the next entry */
if (entry->status == GST_CLOCK_UNSCHEDULED) { if (entry->status == GST_CLOCK_UNSCHEDULED) {
GST_CAT_DEBUG (GST_CAT_CLOCK, "entry %p was unscheduled", entry); GST_CAT_DEBUG (GST_CAT_CLOCK, "entry %p was unscheduled", entry);
gst_system_clock_clear_async_wakeups_unlocked (sysclock);
goto next_entry; goto next_entry;
} }
requested = entry->time; requested = entry->time;
/* see if we have a pending wakeup because the order of the list
* changed. */
if (sysclock->priv->async_wakeup) {
GST_CAT_DEBUG (GST_CAT_CLOCK, "clear async wakeup", entry);
gst_system_clock_remove_wakeup (sysclock);
sysclock->priv->async_wakeup = FALSE;
}
/* now wait for the entry, we already hold the lock */ /* now wait for the entry, we already hold the lock */
res = res =
gst_system_clock_id_wait_jitter_unlocked (clock, (GstClockID) entry, gst_system_clock_id_wait_jitter_unlocked (clock, (GstClockID) entry,
@ -395,7 +420,7 @@ gst_system_clock_async_thread (GstClock * clock)
{ {
/* entry timed out normally, fire the callback and move to the next /* entry timed out normally, fire the callback and move to the next
* entry */ * entry */
GST_CAT_DEBUG (GST_CAT_CLOCK, "async entry %p unlocked", entry); GST_CAT_DEBUG (GST_CAT_CLOCK, "async entry %p timed out", entry);
if (entry->func) { if (entry->func) {
/* unlock before firing the callback */ /* unlock before firing the callback */
GST_OBJECT_UNLOCK (clock); GST_OBJECT_UNLOCK (clock);
@ -403,10 +428,6 @@ gst_system_clock_async_thread (GstClock * clock)
entry->user_data); entry->user_data);
GST_OBJECT_LOCK (clock); GST_OBJECT_LOCK (clock);
} }
if (clock->entries->data != entry) {
/* new entries have been added, clear async wakeups */
gst_system_clock_clear_async_wakeups_unlocked (sysclock);
}
if (entry->type == GST_CLOCK_ENTRY_PERIODIC) { if (entry->type == GST_CLOCK_ENTRY_PERIODIC) {
GST_CAT_DEBUG (GST_CAT_CLOCK, "updating periodic entry %p", entry); GST_CAT_DEBUG (GST_CAT_CLOCK, "updating periodic entry %p", entry);
/* adjust time now */ /* adjust time now */
@ -417,6 +438,7 @@ gst_system_clock_async_thread (GstClock * clock)
/* and restart */ /* and restart */
continue; continue;
} else { } else {
GST_CAT_DEBUG (GST_CAT_CLOCK, "moving to next entry");
goto next_entry; goto next_entry;
} }
} }
@ -426,15 +448,11 @@ gst_system_clock_async_thread (GstClock * clock)
* was canceled. Whatever it is, pick the head entry of the list and * was canceled. Whatever it is, pick the head entry of the list and
* continue waiting. */ * continue waiting. */
GST_CAT_DEBUG (GST_CAT_CLOCK, "async entry %p needs restart", entry); GST_CAT_DEBUG (GST_CAT_CLOCK, "async entry %p needs restart", entry);
/* clear async wakeups, if any */
gst_system_clock_clear_async_wakeups_unlocked (sysclock);
if (clock->entries->data != entry) { /* we set the entry back to the OK state. This is needed so that the
/* if the new head is not this entry, we set the entry back to the OK * _unschedule() code can see if an entry is currently being waited
* state. This is needed so that the _unschedule() code can see if an * on (when its state is BUSY). */
* entry is currently being waited on (when its state is BUSY). */
entry->status = GST_CLOCK_OK; entry->status = GST_CLOCK_OK;
}
continue; continue;
default: default:
GST_CAT_DEBUG (GST_CAT_CLOCK, GST_CAT_DEBUG (GST_CAT_CLOCK,
@ -576,12 +594,8 @@ gst_system_clock_id_wait_jitter_unlocked (GstClock * clock,
/* another thread can read the fd before we get the lock */ /* another thread can read the fd before we get the lock */
GST_OBJECT_LOCK (clock); GST_OBJECT_LOCK (clock);
if (entry->status == GST_CLOCK_UNSCHEDULED) { if (entry->status == GST_CLOCK_UNSCHEDULED) {
GST_CAT_DEBUG (GST_CAT_CLOCK, "entry %p unlocked", entry); GST_CAT_DEBUG (GST_CAT_CLOCK, "entry %p unlocked", entry);
while (!gst_poll_read_control (sysclock->priv->timer)) { gst_system_clock_remove_wakeup (sysclock);
g_warning ("gstsystemclock: read control failed, trying again\n");
}
GST_CLOCK_BROADCAST (clock);
} else { } else {
if (pollret != 0) { if (pollret != 0) {
/* some other id got unlocked */ /* some other id got unlocked */
@ -594,14 +608,12 @@ gst_system_clock_id_wait_jitter_unlocked (GstClock * clock,
/* mark ourselves as EARLY, we release the lock and we could be /* mark ourselves as EARLY, we release the lock and we could be
* unscheduled ourselves but we don't want the unscheduling thread * unscheduled ourselves but we don't want the unscheduling thread
* to write on the fd */ * to write on the control socket (it does that when an entry has a
* BUSY status). */
entry->status = GST_CLOCK_EARLY; entry->status = GST_CLOCK_EARLY;
/* before waiting on the cond, check if another thread read the fd /* wait till all the entries got woken up */
* before we got the lock */ gst_system_clock_wait_wakeup (sysclock);
while (gst_poll_wait (sysclock->priv->timer, 0) > 0) {
GST_CLOCK_WAIT (clock);
}
/* we released the lock in the wait, recheck our status */ /* we released the lock in the wait, recheck our status */
if (entry->status == GST_CLOCK_UNSCHEDULED) { if (entry->status == GST_CLOCK_UNSCHEDULED) {
@ -705,8 +717,8 @@ no_thread:
static GstClockReturn static GstClockReturn
gst_system_clock_id_wait_async (GstClock * clock, GstClockEntry * entry) gst_system_clock_id_wait_async (GstClock * clock, GstClockEntry * entry)
{ {
gboolean empty;
GstSystemClock *sysclock; GstSystemClock *sysclock;
GstClockEntry *head;
sysclock = GST_SYSTEM_CLOCK_CAST (clock); sysclock = GST_SYSTEM_CLOCK_CAST (clock);
@ -718,7 +730,10 @@ gst_system_clock_id_wait_async (GstClock * clock, GstClockEntry * entry)
if (!gst_system_clock_start_async (sysclock)) if (!gst_system_clock_start_async (sysclock))
goto thread_error; goto thread_error;
empty = (clock->entries == NULL); if (clock->entries)
head = clock->entries->data;
else
head = NULL;
/* need to take a ref */ /* need to take a ref */
gst_clock_id_ref ((GstClockID) entry); gst_clock_id_ref ((GstClockID) entry);
@ -731,15 +746,21 @@ gst_system_clock_id_wait_async (GstClock * clock, GstClockEntry * entry)
* will get to this entry automatically. */ * will get to this entry automatically. */
if (clock->entries->data == entry) { if (clock->entries->data == entry) {
GST_CAT_DEBUG (GST_CAT_CLOCK, "async entry added to head"); GST_CAT_DEBUG (GST_CAT_CLOCK, "async entry added to head");
if (empty) { if (head == NULL) {
/* the list was empty before, signal the cond so that the async thread can /* the list was empty before, signal the cond so that the async thread can
* start taking a look at the queue */ * start taking a look at the queue */
GST_CAT_DEBUG (GST_CAT_CLOCK, "sending signal"); GST_CAT_DEBUG (GST_CAT_CLOCK, "first entry, sending signal");
GST_CLOCK_BROADCAST (clock); GST_CLOCK_BROADCAST (clock);
} else { } else {
if (head->status == GST_CLOCK_BUSY) {
/* the async thread was waiting for an entry, unlock the wait so that it /* the async thread was waiting for an entry, unlock the wait so that it
* looks at the new head entry instead */ * looks at the new head entry instead, we only need to do this once */
gst_system_clock_wakeup_async_unlocked (sysclock); if (!sysclock->priv->async_wakeup) {
GST_CAT_DEBUG (GST_CAT_CLOCK, "wakeup async thread");
sysclock->priv->async_wakeup = TRUE;
gst_system_clock_add_wakeup (sysclock);
}
}
} }
} }
GST_OBJECT_UNLOCK (clock); GST_OBJECT_UNLOCK (clock);
@ -761,20 +782,20 @@ thread_error:
static void static void
gst_system_clock_id_unschedule (GstClock * clock, GstClockEntry * entry) gst_system_clock_id_unschedule (GstClock * clock, GstClockEntry * entry)
{ {
GstSystemClock *sysclock;
sysclock = GST_SYSTEM_CLOCK_CAST (clock);
GST_CAT_DEBUG (GST_CAT_CLOCK, "unscheduling entry %p", entry); GST_CAT_DEBUG (GST_CAT_CLOCK, "unscheduling entry %p", entry);
GST_OBJECT_LOCK (clock); GST_OBJECT_LOCK (clock);
if (entry->status == GST_CLOCK_BUSY) { if (entry->status == GST_CLOCK_BUSY) {
/* the entry was being busy, wake up all entries so that they recheck their /* the entry was being busy, wake up all entries so that they recheck their
* status. We cannot wake up just one entry because allocating such a * status. We cannot wake up just one entry because allocating such a
* datastructure for each entry would be too heavey and unlocking an entry * datastructure for each entry would be too heavy and unlocking an entry
* is usually done when shutting down or some other exceptional case. */ * is usually done when shutting down or some other exceptional case. */
GST_CAT_DEBUG (GST_CAT_CLOCK, "writing control"); GST_CAT_DEBUG (GST_CAT_CLOCK, "entry was BUSY, doing wakeup");
while (!gst_poll_write_control (GST_SYSTEM_CLOCK_CAST (clock)->priv->timer)) { gst_system_clock_add_wakeup (sysclock);
g_warning
("gstsystemclock: write control failed in unschedule, trying again\n : %d:%s\n",
errno, g_strerror (errno));
}
} }
/* when it leaves the poll, it'll detect the unscheduled */ /* when it leaves the poll, it'll detect the unscheduled */
entry->status = GST_CLOCK_UNSCHEDULED; entry->status = GST_CLOCK_UNSCHEDULED;