atomicqueue: fix race

After a writer has written to its reserved write location, it can only make the
location available for reading if all of the writers with lower locations have
finished.
This commit is contained in:
Wim Taymans 2012-02-24 15:24:42 +01:00
parent 61e8fa0dc5
commit ed29f33ddb

View file

@ -56,7 +56,7 @@ struct _GstAQueueMem
gint size; gint size;
gpointer *array; gpointer *array;
volatile gint head; volatile gint head;
volatile gint tail; volatile gint tail_write;
volatile gint tail_read; volatile gint tail_read;
GstAQueueMem *next; GstAQueueMem *next;
GstAQueueMem *free; GstAQueueMem *free;
@ -84,7 +84,7 @@ new_queue_mem (guint size, gint pos)
mem->size = clp2 (MAX (size, 16)) - 1; mem->size = clp2 (MAX (size, 16)) - 1;
mem->array = g_new0 (gpointer, mem->size + 1); mem->array = g_new0 (gpointer, mem->size + 1);
mem->head = pos; mem->head = pos;
mem->tail = pos; mem->tail_write = pos;
mem->tail_read = pos; mem->tail_read = pos;
mem->next = NULL; mem->next = NULL;
mem->free = NULL; mem->free = NULL;
@ -297,7 +297,8 @@ gst_atomic_queue_pop (GstAtomicQueue * queue)
size = head_mem->size; size = head_mem->size;
/* when we are not empty, we can continue */ /* when we are not empty, we can continue */
if (G_LIKELY (head != tail)) if G_LIKELY
(head != tail)
break; break;
/* else array empty, try to take next */ /* else array empty, try to take next */
@ -307,7 +308,8 @@ gst_atomic_queue_pop (GstAtomicQueue * queue)
/* now we try to move the next array as the head memory. If we fail to do that, /* now we try to move the next array as the head memory. If we fail to do that,
* some other reader managed to do it first and we retry */ * some other reader managed to do it first and we retry */
if (!g_atomic_pointer_compare_and_exchange (&queue->head_mem, head_mem, if G_UNLIKELY
(!g_atomic_pointer_compare_and_exchange (&queue->head_mem, head_mem,
next)) next))
continue; continue;
@ -318,8 +320,8 @@ gst_atomic_queue_pop (GstAtomicQueue * queue)
} }
ret = head_mem->array[head & size]; ret = head_mem->array[head & size];
} while (!g_atomic_int_compare_and_exchange (&head_mem->head, head, } while G_UNLIKELY
head + 1)); (!g_atomic_int_compare_and_exchange (&head_mem->head, head, head + 1));
#ifdef LOW_MEM #ifdef LOW_MEM
/* decrement number of readers, when we reach 0 readers we can be sure that /* decrement number of readers, when we reach 0 readers we can be sure that
@ -354,18 +356,20 @@ gst_atomic_queue_push (GstAtomicQueue * queue, gpointer data)
tail_mem = g_atomic_pointer_get (&queue->tail_mem); tail_mem = g_atomic_pointer_get (&queue->tail_mem);
head = g_atomic_int_get (&tail_mem->head); head = g_atomic_int_get (&tail_mem->head);
tail = g_atomic_int_get (&tail_mem->tail); tail = g_atomic_int_get (&tail_mem->tail_write);
size = tail_mem->size; size = tail_mem->size;
/* we're not full, continue */ /* we're not full, continue */
if (tail - head <= size) if G_LIKELY
(tail - head <= size)
break; break;
/* else we need to grow the array, we store a mask so we have to add 1 */ /* else we need to grow the array, we store a mask so we have to add 1 */
mem = new_queue_mem ((size << 1) + 1, tail); mem = new_queue_mem ((size << 1) + 1, tail);
/* try to make our new array visible to other writers */ /* try to make our new array visible to other writers */
if (!g_atomic_pointer_compare_and_exchange (&queue->tail_mem, tail_mem, if G_UNLIKELY
(!g_atomic_pointer_compare_and_exchange (&queue->tail_mem, tail_mem,
mem)) { mem)) {
/* we tried to swap the new writer array but something changed. This is /* we tried to swap the new writer array but something changed. This is
* because some other writer beat us to it, we free our memory and try * because some other writer beat us to it, we free our memory and try
@ -378,13 +382,18 @@ gst_atomic_queue_push (GstAtomicQueue * queue, gpointer data)
* pointer to the new array */ * pointer to the new array */
g_atomic_pointer_set (&tail_mem->next, mem); g_atomic_pointer_set (&tail_mem->next, mem);
} }
} while (!g_atomic_int_compare_and_exchange (&tail_mem->tail, tail, } while G_UNLIKELY
tail + 1)); (!g_atomic_int_compare_and_exchange (&tail_mem->tail_write, tail, tail + 1));
tail_mem->array[tail & size] = data; tail_mem->array[tail & size] = data;
/* and now the readers can read */ /* now wait until all writers have completed their write before we move the
g_atomic_int_inc (&tail_mem->tail_read); * tail_read to this new item. It is possible that other writers are still
* updating the previous array slots and we don't want to reveal their changes
* before they are done. FIXME, it would be nice if we didn't have to busy
* wait here. */
while G_UNLIKELY
(!g_atomic_int_compare_and_exchange (&tail_mem->tail_read, tail, tail + 1));
} }
/** /**