cudamemory: Make GstCudaStream-aware

This will be used for CUDA stream sharing.

* Adding GstCudaPoolAllocator object. The pool allocator will
  control synchronization of allocated memory objects.
* Modify gst_cuda_allocator_alloc() API so that caller can specify/set
  GstCudaStream object for the newly allocated memory.
* GST_CUDA_MEMORY_TRANSFER_NEED_SYNC flag is added in addition to
  existing GST_CUDA_MEMORY_TRANSFER_NEED_{UPLOAD,DOWNLOAD}.
  The flag indicates that any GPU command queued in the CUDA stream
  may not be finished yet, and caller should take care of the
  synchronization.
  The flag is controlled by GstCudaMemory object if the memory holds
  GstCudaStream. (Otherwise, GstCudaMemory will do synchronization
  as before this commit). Specifically, GstCudaMemory object will set
  the new flag automatically when memory is mapped with
  (GST_MAP_CUDA | GST_MAP_WRITE) flags. Caller will need to unset
  the flag via GST_MEMORY_FLAG_UNSET() if it's already synchronized
  by client code.
* gst_cuda_memory_sync() helper function is added to perform synchronization
* Why not use CUevent object to keep track of synchronization status?
  CUDA provides fence-like interface already via CUevent object,
  but cuEventRecord/cuEventQuery APIs are not zero-cost operations.
  Instead, in this version, the status is tracked by using map and
  object flags.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/3629>
This commit is contained in:
Seungha Yang 2022-12-19 21:56:37 +09:00 committed by GStreamer Marge Bot
parent 9eaae61a44
commit 30d06e03c2
3 changed files with 764 additions and 37 deletions

View file

@ -73,7 +73,7 @@ gst_cuda_buffer_pool_set_config (GstBufferPool * pool, GstStructure * config)
return FALSE; return FALSE;
} }
mem = gst_cuda_allocator_alloc (NULL, self->context, &info); mem = gst_cuda_allocator_alloc (NULL, self->context, NULL, &info);
if (!mem) { if (!mem) {
GST_WARNING_OBJECT (self, "Failed to allocate memory"); GST_WARNING_OBJECT (self, "Failed to allocate memory");
return FALSE; return FALSE;
@ -102,7 +102,7 @@ gst_cuda_buffer_pool_alloc (GstBufferPool * pool, GstBuffer ** buffer,
GstMemory *mem; GstMemory *mem;
GstCudaMemory *cmem; GstCudaMemory *cmem;
mem = gst_cuda_allocator_alloc (NULL, self->context, &priv->info); mem = gst_cuda_allocator_alloc (NULL, self->context, NULL, &priv->info);
if (!mem) { if (!mem) {
GST_WARNING_OBJECT (pool, "Cannot create CUDA memory"); GST_WARNING_OBJECT (pool, "Cannot create CUDA memory");
return GST_FLOW_ERROR; return GST_FLOW_ERROR;

View file

@ -42,10 +42,18 @@ struct _GstCudaMemoryPrivate
guint height; guint height;
GMutex lock; GMutex lock;
GstCudaStream *stream;
};
struct _GstCudaAllocatorPrivate
{
GstMemoryCopyFunction fallback_copy;
}; };
#define gst_cuda_allocator_parent_class parent_class #define gst_cuda_allocator_parent_class parent_class
G_DEFINE_TYPE (GstCudaAllocator, gst_cuda_allocator, GST_TYPE_ALLOCATOR); G_DEFINE_TYPE_WITH_PRIVATE (GstCudaAllocator, gst_cuda_allocator,
GST_TYPE_ALLOCATOR);
static void gst_cuda_allocator_free (GstAllocator * allocator, static void gst_cuda_allocator_free (GstAllocator * allocator,
GstMemory * memory); GstMemory * memory);
@ -78,13 +86,17 @@ static void
gst_cuda_allocator_init (GstCudaAllocator * allocator) gst_cuda_allocator_init (GstCudaAllocator * allocator)
{ {
GstAllocator *alloc = GST_ALLOCATOR_CAST (allocator); GstAllocator *alloc = GST_ALLOCATOR_CAST (allocator);
GstCudaAllocatorPrivate *priv;
GST_DEBUG_OBJECT (allocator, "init"); priv = allocator->priv = gst_cuda_allocator_get_instance_private (allocator);
alloc->mem_type = GST_CUDA_MEMORY_TYPE_NAME; alloc->mem_type = GST_CUDA_MEMORY_TYPE_NAME;
alloc->mem_map = cuda_mem_map; alloc->mem_map = cuda_mem_map;
alloc->mem_unmap_full = cuda_mem_unmap_full; alloc->mem_unmap_full = cuda_mem_unmap_full;
/* Store pointer to default mem_copy method for fallback copy */
priv->fallback_copy = alloc->mem_copy;
alloc->mem_copy = cuda_mem_copy; alloc->mem_copy = cuda_mem_copy;
GST_OBJECT_FLAG_SET (allocator, GST_ALLOCATOR_FLAG_CUSTOM_ALLOC); GST_OBJECT_FLAG_SET (allocator, GST_ALLOCATOR_FLAG_CUSTOM_ALLOC);
@ -92,7 +104,7 @@ gst_cuda_allocator_init (GstCudaAllocator * allocator)
static GstMemory * static GstMemory *
gst_cuda_allocator_alloc_internal (GstCudaAllocator * self, gst_cuda_allocator_alloc_internal (GstCudaAllocator * self,
GstCudaContext * context, const GstVideoInfo * info, GstCudaContext * context, GstCudaStream * stream, const GstVideoInfo * info,
guint width_in_bytes, guint alloc_height) guint width_in_bytes, guint alloc_height)
{ {
GstCudaMemoryPrivate *priv; GstCudaMemoryPrivate *priv;
@ -123,6 +135,8 @@ gst_cuda_allocator_alloc_internal (GstCudaAllocator * self,
priv->width_in_bytes = width_in_bytes; priv->width_in_bytes = width_in_bytes;
priv->height = alloc_height; priv->height = alloc_height;
g_mutex_init (&priv->lock); g_mutex_init (&priv->lock);
if (stream)
priv->stream = gst_cuda_stream_ref (stream);
mem->context = gst_object_ref (context); mem->context = gst_object_ref (context);
mem->info = *info; mem->info = *info;
@ -221,6 +235,12 @@ gst_cuda_allocator_free (GstAllocator * allocator, GstMemory * memory)
GstCudaMemoryPrivate *priv = mem->priv; GstCudaMemoryPrivate *priv = mem->priv;
gst_cuda_context_push (mem->context); gst_cuda_context_push (mem->context);
/* Finish any pending operations before freeing */
if (priv->stream &&
GST_MEMORY_FLAG_IS_SET (mem, GST_CUDA_MEMORY_TRANSFER_NEED_SYNC)) {
CuStreamSynchronize (gst_cuda_stream_get_handle (priv->stream));
}
if (priv->data) if (priv->data)
gst_cuda_result (CuMemFree (priv->data)); gst_cuda_result (CuMemFree (priv->data));
@ -228,6 +248,7 @@ gst_cuda_allocator_free (GstAllocator * allocator, GstMemory * memory)
gst_cuda_result (CuMemFreeHost (priv->staging)); gst_cuda_result (CuMemFreeHost (priv->staging));
gst_cuda_context_pop (NULL); gst_cuda_context_pop (NULL);
gst_clear_cuda_stream (&priv->stream);
gst_object_unref (mem->context); gst_object_unref (mem->context);
g_mutex_clear (&priv->lock); g_mutex_clear (&priv->lock);
@ -241,6 +262,7 @@ gst_cuda_memory_upload (GstCudaAllocator * self, GstCudaMemory * mem)
GstCudaMemoryPrivate *priv = mem->priv; GstCudaMemoryPrivate *priv = mem->priv;
gboolean ret = TRUE; gboolean ret = TRUE;
CUDA_MEMCPY2D param = { 0, }; CUDA_MEMCPY2D param = { 0, };
CUstream stream = gst_cuda_stream_get_handle (priv->stream);
if (!priv->staging || if (!priv->staging ||
!GST_MEMORY_FLAG_IS_SET (mem, GST_CUDA_MEMORY_TRANSFER_NEED_UPLOAD)) { !GST_MEMORY_FLAG_IS_SET (mem, GST_CUDA_MEMORY_TRANSFER_NEED_UPLOAD)) {
@ -262,7 +284,16 @@ gst_cuda_memory_upload (GstCudaAllocator * self, GstCudaMemory * mem)
param.WidthInBytes = priv->width_in_bytes; param.WidthInBytes = priv->width_in_bytes;
param.Height = priv->height; param.Height = priv->height;
ret = gst_cuda_result (CuMemcpy2D (&param)); ret = gst_cuda_result (CuMemcpy2DAsync (&param, stream));
/* Sync only if we use default stream.
* Otherwise (in case of non-default stream case) sync is caller's
* responsibility */
if (!priv->stream) {
CuStreamSynchronize (stream);
GST_MINI_OBJECT_FLAG_UNSET (mem, GST_CUDA_MEMORY_TRANSFER_NEED_SYNC);
} else {
GST_MINI_OBJECT_FLAG_SET (mem, GST_CUDA_MEMORY_TRANSFER_NEED_SYNC);
}
gst_cuda_context_pop (NULL); gst_cuda_context_pop (NULL);
if (!ret) if (!ret)
@ -277,6 +308,7 @@ gst_cuda_memory_download (GstCudaAllocator * self, GstCudaMemory * mem)
GstCudaMemoryPrivate *priv = mem->priv; GstCudaMemoryPrivate *priv = mem->priv;
gboolean ret = TRUE; gboolean ret = TRUE;
CUDA_MEMCPY2D param = { 0, }; CUDA_MEMCPY2D param = { 0, };
CUstream stream = gst_cuda_stream_get_handle (priv->stream);
if (!GST_MEMORY_FLAG_IS_SET (mem, GST_CUDA_MEMORY_TRANSFER_NEED_DOWNLOAD)) if (!GST_MEMORY_FLAG_IS_SET (mem, GST_CUDA_MEMORY_TRANSFER_NEED_DOWNLOAD))
return TRUE; return TRUE;
@ -306,8 +338,11 @@ gst_cuda_memory_download (GstCudaAllocator * self, GstCudaMemory * mem)
param.WidthInBytes = priv->width_in_bytes; param.WidthInBytes = priv->width_in_bytes;
param.Height = priv->height; param.Height = priv->height;
ret = gst_cuda_result (CuMemcpy2D (&param)); ret = gst_cuda_result (CuMemcpy2DAsync (&param, stream));
/* For CPU access, sync immediately */
CuStreamSynchronize (stream);
gst_cuda_context_pop (NULL); gst_cuda_context_pop (NULL);
GST_MINI_OBJECT_FLAG_UNSET (mem, GST_CUDA_MEMORY_TRANSFER_NEED_SYNC);
if (!ret) if (!ret)
GST_ERROR_OBJECT (self, "Failed to upload memory"); GST_ERROR_OBJECT (self, "Failed to upload memory");
@ -330,8 +365,12 @@ cuda_mem_map (GstMemory * mem, gsize maxsize, GstMapFlags flags)
GST_MEMORY_FLAG_UNSET (mem, GST_CUDA_MEMORY_TRANSFER_NEED_UPLOAD); GST_MEMORY_FLAG_UNSET (mem, GST_CUDA_MEMORY_TRANSFER_NEED_UPLOAD);
if ((flags & GST_MAP_WRITE) == GST_MAP_WRITE) if ((flags & GST_MAP_WRITE) == GST_MAP_WRITE) {
GST_MINI_OBJECT_FLAG_SET (mem, GST_CUDA_MEMORY_TRANSFER_NEED_DOWNLOAD); GST_MINI_OBJECT_FLAG_SET (mem, GST_CUDA_MEMORY_TRANSFER_NEED_DOWNLOAD);
/* Assume that memory needs sync if we are using non-default stream */
if (priv->stream)
GST_MINI_OBJECT_FLAG_SET (mem, GST_CUDA_MEMORY_TRANSFER_NEED_SYNC);
}
ret = (gpointer) priv->data; ret = (gpointer) priv->data;
goto out; goto out;
@ -388,12 +427,26 @@ cuda_mem_copy (GstMemory * mem, gssize offset, gssize size)
GstCudaContext *context = src_mem->context; GstCudaContext *context = src_mem->context;
GstMapInfo src_info, dst_info; GstMapInfo src_info, dst_info;
CUDA_MEMCPY2D param = { 0, }; CUDA_MEMCPY2D param = { 0, };
GstMemory *copy; GstMemory *copy = NULL;
gboolean ret; gboolean ret;
GstCudaStream *stream = src_mem->priv->stream;
CUstream stream_handle = gst_cuda_stream_get_handle (stream);
/* offset and size are ignored */ /* non-zero offset or different size is not supported */
copy = gst_cuda_allocator_alloc_internal (self, context, if (offset != 0 || (size != -1 && (gsize) size != mem->size)) {
&src_mem->info, src_mem->priv->width_in_bytes, src_mem->priv->height); GST_DEBUG_OBJECT (self, "Different size/offset, try fallback copy");
return self->priv->fallback_copy (mem, offset, size);
}
if (GST_IS_CUDA_POOL_ALLOCATOR (self)) {
gst_cuda_pool_allocator_acquire_memory (GST_CUDA_POOL_ALLOCATOR (self),
&copy);
}
if (!copy) {
copy = gst_cuda_allocator_alloc_internal (self, context, stream,
&src_mem->info, src_mem->priv->width_in_bytes, src_mem->priv->height);
}
if (!copy) { if (!copy) {
GST_ERROR_OBJECT (self, "Failed to allocate memory for copying"); GST_ERROR_OBJECT (self, "Failed to allocate memory for copying");
@ -431,7 +484,8 @@ cuda_mem_copy (GstMemory * mem, gssize offset, gssize size)
param.WidthInBytes = src_mem->priv->width_in_bytes; param.WidthInBytes = src_mem->priv->width_in_bytes;
param.Height = src_mem->priv->height; param.Height = src_mem->priv->height;
ret = gst_cuda_result (CuMemcpy2D (&param)); ret = gst_cuda_result (CuMemcpy2DAsync (&param, stream_handle));
CuStreamSynchronize (stream_handle);
gst_cuda_context_pop (NULL); gst_cuda_context_pop (NULL);
gst_memory_unmap (mem, &src_info); gst_memory_unmap (mem, &src_info);
@ -484,10 +538,61 @@ gst_is_cuda_memory (GstMemory * mem)
GST_IS_CUDA_ALLOCATOR (mem->allocator); GST_IS_CUDA_ALLOCATOR (mem->allocator);
} }
/**
* gst_cuda_memory_get_stream:
* @mem: A #GstCudaMemory
*
* Gets CUDA stream object associated with @mem
*
* Returns: (transfer none) (nullable): a #GstCudaStream or %NULL if default
* CUDA stream is in use
*
* Since: 1.24
*/
GstCudaStream *
gst_cuda_memory_get_stream (GstCudaMemory * mem)
{
g_return_val_if_fail (gst_is_cuda_memory ((GstMemory *) mem), NULL);
return mem->priv->stream;
}
/**
* gst_cuda_memory_sync:
* @mem: A #GstCudaMemory
*
* Performs synchronization if needed
*
* Since: 1.24
*/
void
gst_cuda_memory_sync (GstCudaMemory * mem)
{
GstCudaMemoryPrivate *priv;
g_return_if_fail (gst_is_cuda_memory ((GstMemory *) mem));
priv = mem->priv;
if (!priv->stream)
return;
g_mutex_lock (&priv->lock);
if (GST_MEMORY_FLAG_IS_SET (mem, GST_CUDA_MEMORY_TRANSFER_NEED_SYNC)) {
GST_MEMORY_FLAG_UNSET (mem, GST_CUDA_MEMORY_TRANSFER_NEED_SYNC);
if (gst_cuda_context_push (mem->context)) {
CuStreamSynchronize (gst_cuda_stream_get_handle (priv->stream));
gst_cuda_context_pop (NULL);
}
}
g_mutex_unlock (&priv->lock);
}
/** /**
* gst_cuda_allocator_alloc: * gst_cuda_allocator_alloc:
* @allocator: (transfer none) (allow-none): a #GstCudaAllocator * @allocator: (transfer none) (allow-none): a #GstCudaAllocator
* @context: (transfer none): a #GstCudaContext * @context: (transfer none): a #GstCudaContext
* @stream: (transfer none) (allow-none): a #GstCudaStream
* @info: a #GstVideoInfo * @info: a #GstVideoInfo
* *
* Returns: (transfer full) (nullable): a newly allocated #GstCudaMemory * Returns: (transfer full) (nullable): a newly allocated #GstCudaMemory
@ -496,13 +601,20 @@ gst_is_cuda_memory (GstMemory * mem)
*/ */
GstMemory * GstMemory *
gst_cuda_allocator_alloc (GstCudaAllocator * allocator, gst_cuda_allocator_alloc (GstCudaAllocator * allocator,
GstCudaContext * context, const GstVideoInfo * info) GstCudaContext * context, GstCudaStream * stream, const GstVideoInfo * info)
{ {
guint alloc_height; guint alloc_height;
g_return_val_if_fail (GST_IS_CUDA_CONTEXT (context), NULL); g_return_val_if_fail (GST_IS_CUDA_CONTEXT (context), NULL);
g_return_val_if_fail (!stream || GST_IS_CUDA_STREAM (stream), NULL);
g_return_val_if_fail (info != NULL, NULL); g_return_val_if_fail (info != NULL, NULL);
if (stream && stream->context != context) {
GST_ERROR_OBJECT (context,
"stream object is holding different CUDA context");
return NULL;
}
if (!allocator) if (!allocator)
allocator = (GstCudaAllocator *) _gst_cuda_allocator; allocator = (GstCudaAllocator *) _gst_cuda_allocator;
@ -549,6 +661,518 @@ gst_cuda_allocator_alloc (GstCudaAllocator * allocator,
break; break;
} }
return gst_cuda_allocator_alloc_internal (allocator, context, return gst_cuda_allocator_alloc_internal (allocator, context, stream,
info, info->stride[0], alloc_height); info, info->stride[0], alloc_height);
} }
/**
* gst_cuda_allocator_set_active:
* @allocator: a #GstCudaAllocator
* @active: the new active state
*
* Controls the active state of @allocator. Default #GstCudaAllocator is
* stateless and therefore active state is ignored, but subclass implementation
* (e.g., #GstCudaPoolAllocator) will require explicit active state control
* for its internal resource management.
*
* This method is conceptually identical to gst_buffer_pool_set_active method.
*
* Returns: %TRUE if active state of @allocator was successfully updated.
*
* Since: 1.24
*/
gboolean
gst_cuda_allocator_set_active (GstCudaAllocator * allocator, gboolean active)
{
GstCudaAllocatorClass *klass;
g_return_val_if_fail (GST_IS_CUDA_ALLOCATOR (allocator), FALSE);
klass = GST_CUDA_ALLOCATOR_GET_CLASS (allocator);
if (klass->set_active)
return klass->set_active (allocator, active);
return TRUE;
}
#define GST_CUDA_POOL_ALLOCATOR_IS_FLUSHING(alloc) (g_atomic_int_get (&alloc->priv->flushing))
struct _GstCudaPoolAllocatorPrivate
{
GstAtomicQueue *queue;
GstPoll *poll;
GRecMutex lock;
gboolean started;
gboolean active;
guint outstanding;
guint cur_mems;
gboolean flushing;
};
static void gst_cuda_pool_allocator_finalize (GObject * object);
static gboolean
gst_cuda_pool_allocator_set_active (GstCudaAllocator * allocator,
gboolean active);
static gboolean gst_cuda_pool_allocator_start (GstCudaPoolAllocator * self);
static gboolean gst_cuda_pool_allocator_stop (GstCudaPoolAllocator * self);
static gboolean gst_cuda_memory_release (GstMiniObject * mini_object);
#define gst_cuda_pool_allocator_parent_class pool_alloc_parent_class
G_DEFINE_TYPE_WITH_PRIVATE (GstCudaPoolAllocator,
gst_cuda_pool_allocator, GST_TYPE_CUDA_ALLOCATOR);
static void
gst_cuda_pool_allocator_class_init (GstCudaPoolAllocatorClass * klass)
{
GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
GstCudaAllocatorClass *cuda_alloc_class = GST_CUDA_ALLOCATOR_CLASS (klass);
gobject_class->finalize = gst_cuda_pool_allocator_finalize;
cuda_alloc_class->set_active = gst_cuda_pool_allocator_set_active;
}
static void
gst_cuda_pool_allocator_init (GstCudaPoolAllocator * allocator)
{
GstCudaPoolAllocatorPrivate *priv;
priv = allocator->priv =
gst_cuda_pool_allocator_get_instance_private (allocator);
g_rec_mutex_init (&priv->lock);
priv->poll = gst_poll_new_timer ();
priv->queue = gst_atomic_queue_new (16);
priv->flushing = 1;
priv->active = FALSE;
priv->started = FALSE;
/* 1 control write for flushing - the flush token */
gst_poll_write_control (priv->poll);
/* 1 control write for marking that we are not waiting for poll - the wait token */
gst_poll_write_control (priv->poll);
}
static void
gst_cuda_pool_allocator_finalize (GObject * object)
{
GstCudaPoolAllocator *self = GST_CUDA_POOL_ALLOCATOR (object);
GstCudaPoolAllocatorPrivate *priv = self->priv;
GST_DEBUG_OBJECT (self, "Finalize");
gst_cuda_pool_allocator_stop (self);
gst_atomic_queue_unref (priv->queue);
gst_poll_free (priv->poll);
g_rec_mutex_clear (&priv->lock);
gst_clear_cuda_stream (&self->stream);
gst_clear_object (&self->context);
G_OBJECT_CLASS (pool_alloc_parent_class)->finalize (object);
}
static gboolean
gst_cuda_pool_allocator_start (GstCudaPoolAllocator * self)
{
GstCudaPoolAllocatorPrivate *priv = self->priv;
priv->started = TRUE;
return TRUE;
}
static void
gst_cuda_pool_allocator_do_set_flushing (GstCudaPoolAllocator * self,
gboolean flushing)
{
GstCudaPoolAllocatorPrivate *priv = self->priv;
if (GST_CUDA_POOL_ALLOCATOR_IS_FLUSHING (self) == flushing)
return;
if (flushing) {
g_atomic_int_set (&priv->flushing, 1);
/* Write the flush token to wake up any waiters */
gst_poll_write_control (priv->poll);
} else {
while (!gst_poll_read_control (priv->poll)) {
if (errno == EWOULDBLOCK) {
/* This should not really happen unless flushing and unflushing
* happens on different threads. Let's wait a bit to get back flush
* token from the thread that was setting it to flushing */
g_thread_yield ();
continue;
} else {
/* Critical error but GstPoll already complained */
break;
}
}
g_atomic_int_set (&priv->flushing, 0);
}
}
static gboolean
gst_cuda_pool_allocator_set_active (GstCudaAllocator * allocator,
gboolean active)
{
GstCudaPoolAllocator *self = GST_CUDA_POOL_ALLOCATOR (allocator);
GstCudaPoolAllocatorPrivate *priv = self->priv;
gboolean ret = TRUE;
GST_LOG_OBJECT (self, "active %d", active);
g_rec_mutex_lock (&priv->lock);
/* just return if we are already in the right state */
if (priv->active == active)
goto done;
if (active) {
gst_cuda_pool_allocator_start (self);
/* flush_stop may release memory objects, setting to active to avoid running
* do_stop while activating the pool */
priv->active = TRUE;
gst_cuda_pool_allocator_do_set_flushing (self, FALSE);
} else {
gint outstanding;
/* set to flushing first */
gst_cuda_pool_allocator_do_set_flushing (self, TRUE);
/* when all memory objects are in the pool, free them. Else they will be
* freed when they are released */
outstanding = g_atomic_int_get (&priv->outstanding);
GST_LOG_OBJECT (self, "outstanding memories %d, (in queue %d)",
outstanding, gst_atomic_queue_length (priv->queue));
if (outstanding == 0) {
if (!gst_cuda_pool_allocator_stop (self)) {
GST_ERROR_OBJECT (self, "stop failed");
ret = FALSE;
goto done;
}
}
priv->active = FALSE;
}
done:
g_rec_mutex_unlock (&priv->lock);
return ret;
}
static void
gst_cuda_pool_allocator_free_memory (GstCudaPoolAllocator * self,
GstMemory * mem)
{
GstCudaPoolAllocatorPrivate *priv = self->priv;
g_atomic_int_add (&priv->cur_mems, -1);
GST_LOG_OBJECT (self, "freeing memory %p (%u left)", mem, priv->cur_mems);
GST_MINI_OBJECT_CAST (mem)->dispose = NULL;
gst_memory_unref (mem);
}
static gboolean
gst_cuda_pool_allocator_clear_queue (GstCudaPoolAllocator * self)
{
GstCudaPoolAllocatorPrivate *priv = self->priv;
GstMemory *memory;
GST_LOG_OBJECT (self, "Clearing queue");
if (self->stream) {
/* Wait for outstanding operations */
gst_cuda_context_push (self->context);
CuStreamSynchronize (gst_cuda_stream_get_handle (self->stream));
gst_cuda_context_pop (NULL);
}
while ((memory = (GstMemory *) gst_atomic_queue_pop (priv->queue))) {
while (!gst_poll_read_control (priv->poll)) {
if (errno == EWOULDBLOCK) {
/* We put the memory into the queue but did not finish writing control
* yet, let's wait a bit and retry */
g_thread_yield ();
continue;
} else {
/* Critical error but GstPoll already complained */
break;
}
}
/* Already synchronized above */
GST_MEMORY_FLAG_UNSET (memory, GST_CUDA_MEMORY_TRANSFER_NEED_SYNC);
gst_cuda_pool_allocator_free_memory (self, memory);
}
GST_LOG_OBJECT (self, "Clear done");
return priv->cur_mems == 0;
}
/* must be called with the lock */
static gboolean
gst_cuda_pool_allocator_stop (GstCudaPoolAllocator * self)
{
GstCudaPoolAllocatorPrivate *priv = self->priv;
GST_DEBUG_OBJECT (self, "Stop");
if (priv->started) {
if (!gst_cuda_pool_allocator_clear_queue (self))
return FALSE;
priv->started = FALSE;
}
return TRUE;
}
static inline void
dec_outstanding (GstCudaPoolAllocator * self)
{
if (g_atomic_int_dec_and_test (&self->priv->outstanding)) {
/* all memory objects are returned to the pool, see if we need to free them */
if (GST_CUDA_POOL_ALLOCATOR_IS_FLUSHING (self)) {
/* take the lock so that set_active is not run concurrently */
g_rec_mutex_lock (&self->priv->lock);
/* now that we have the lock, check if we have been de-activated with
* outstanding buffers */
if (!self->priv->active)
gst_cuda_pool_allocator_stop (self);
g_rec_mutex_unlock (&self->priv->lock);
}
}
}
static void
gst_cuda_pool_allocator_release_memory (GstCudaPoolAllocator * self,
GstMemory * mem)
{
GST_LOG_OBJECT (self, "Released memory %p", mem);
GST_MINI_OBJECT_CAST (mem)->dispose = NULL;
mem->allocator = gst_object_ref (_gst_cuda_allocator);
/* keep it around in our queue */
gst_atomic_queue_push (self->priv->queue, mem);
gst_poll_write_control (self->priv->poll);
dec_outstanding (self);
gst_object_unref (self);
}
static gboolean
gst_cuda_memory_release (GstMiniObject * object)
{
GstMemory *mem = GST_MEMORY_CAST (object);
GstCudaPoolAllocator *alloc;
g_assert (mem->allocator);
if (!GST_IS_CUDA_POOL_ALLOCATOR (mem->allocator)) {
GST_LOG_OBJECT (mem->allocator, "Not our memory, free");
return TRUE;
}
alloc = GST_CUDA_POOL_ALLOCATOR (mem->allocator);
/* if flushing, free this memory */
if (GST_CUDA_POOL_ALLOCATOR_IS_FLUSHING (alloc)) {
GST_LOG_OBJECT (alloc, "allocator is flushing, free %p", mem);
return TRUE;
}
/* return the memory to the allocator */
gst_memory_ref (mem);
gst_cuda_pool_allocator_release_memory (alloc, mem);
return FALSE;
}
/* must be called with the lock */
static GstFlowReturn
gst_cuda_pool_allocator_alloc (GstCudaPoolAllocator * self, GstMemory ** mem)
{
GstCudaPoolAllocatorPrivate *priv = self->priv;
GstMemory *new_mem;
/* increment the allocation counter */
g_atomic_int_add (&priv->cur_mems, 1);
new_mem = gst_cuda_allocator_alloc ((GstCudaAllocator *) _gst_cuda_allocator,
self->context, self->stream, &self->info);
if (!new_mem) {
GST_ERROR_OBJECT (self, "Failed to allocate new memory");
g_atomic_int_add (&priv->cur_mems, -1);
return GST_FLOW_ERROR;
}
*mem = new_mem;
return GST_FLOW_OK;
}
static GstFlowReturn
gst_cuda_pool_allocator_acquire_memory_internal (GstCudaPoolAllocator * self,
GstMemory ** memory)
{
GstFlowReturn result;
GstCudaPoolAllocatorPrivate *priv = self->priv;
while (TRUE) {
if (G_UNLIKELY (GST_CUDA_POOL_ALLOCATOR_IS_FLUSHING (self)))
goto flushing;
/* try to get a memory from the queue */
*memory = (GstMemory *) gst_atomic_queue_pop (priv->queue);
if (G_LIKELY (*memory)) {
while (!gst_poll_read_control (priv->poll)) {
if (errno == EWOULDBLOCK) {
/* We put the memory into the queue but did not finish writing control
* yet, let's wait a bit and retry */
g_thread_yield ();
continue;
} else {
/* Critical error but GstPoll already complained */
break;
}
}
result = GST_FLOW_OK;
GST_LOG_OBJECT (self, "acquired memory %p", *memory);
break;
}
/* no memory, try to allocate some more */
GST_LOG_OBJECT (self, "no memory, trying to allocate");
result = gst_cuda_pool_allocator_alloc (self, memory);
if (G_LIKELY (result == GST_FLOW_OK))
/* we have a memory, return it */
break;
if (G_UNLIKELY (result != GST_FLOW_EOS))
/* something went wrong, return error */
break;
/* now we release the control socket, we wait for a memory release or
* flushing */
if (!gst_poll_read_control (priv->poll)) {
if (errno == EWOULDBLOCK) {
/* This means that we have two threads trying to allocate memory
* already, and the other one already got the wait token. This
* means that we only have to wait for the poll now and not write the
* token afterwards: we will be woken up once the other thread is
* woken up and that one will write the wait token it removed */
GST_LOG_OBJECT (self, "waiting for free memory or flushing");
gst_poll_wait (priv->poll, GST_CLOCK_TIME_NONE);
} else {
/* This is a critical error, GstPoll already gave a warning */
result = GST_FLOW_ERROR;
break;
}
} else {
/* We're the first thread waiting, we got the wait token and have to
* write it again later
* OR
* We're a second thread and just consumed the flush token and block all
* other threads, in which case we must not wait and give it back
* immediately */
if (!GST_CUDA_POOL_ALLOCATOR_IS_FLUSHING (self)) {
GST_LOG_OBJECT (self, "waiting for free memory or flushing");
gst_poll_wait (priv->poll, GST_CLOCK_TIME_NONE);
}
gst_poll_write_control (priv->poll);
}
}
return result;
/* ERRORS */
flushing:
{
GST_DEBUG_OBJECT (self, "we are flushing");
return GST_FLOW_FLUSHING;
}
}
/**
* gst_cuda_pool_allocator_new:
* @context: a #GstCudaContext
* @stream: (allow-none): a #GstCudaStream
* @info: a #GstVideoInfo
*
* Creates a new #GstCudaPoolAllocator instance.
*
* Returns: (transfer full): a new #GstCudaPoolAllocator instance
*
* Since: 1.24
*/
GstCudaPoolAllocator *
gst_cuda_pool_allocator_new (GstCudaContext * context, GstCudaStream * stream,
const GstVideoInfo * info)
{
GstCudaPoolAllocator *self;
g_return_val_if_fail (GST_IS_CUDA_CONTEXT (context), NULL);
g_return_val_if_fail (!stream || GST_IS_CUDA_STREAM (stream), NULL);
self = g_object_new (GST_TYPE_CUDA_POOL_ALLOCATOR, NULL);
gst_object_ref_sink (self);
self->context = gst_object_ref (context);
if (stream)
self->stream = gst_cuda_stream_ref (stream);
self->info = *info;
return self;
}
/**
* gst_cuda_pool_allocator_acquire_memory:
* @allocator: a #GstCudaPoolAllocator
* @memory: (out): a #GstMemory
*
* Acquires a #GstMemory from @allocator. @memory should point to a memory
* location that can hold a pointer to the new #GstMemory.
*
* Returns: a #GstFlowReturn such as %GST_FLOW_FLUSHING when the allocator is
* inactive.
*
* Since: 1.24
*/
GstFlowReturn
gst_cuda_pool_allocator_acquire_memory (GstCudaPoolAllocator * allocator,
GstMemory ** memory)
{
GstFlowReturn result;
GstCudaPoolAllocatorPrivate *priv;
g_return_val_if_fail (GST_IS_CUDA_POOL_ALLOCATOR (allocator), GST_FLOW_ERROR);
g_return_val_if_fail (memory, GST_FLOW_ERROR);
priv = allocator->priv;
g_atomic_int_inc (&priv->outstanding);
result = gst_cuda_pool_allocator_acquire_memory_internal (allocator, memory);
if (result == GST_FLOW_OK) {
GstMemory *mem = *memory;
/* Replace default allocator with ours */
gst_object_unref (mem->allocator);
mem->allocator = gst_object_ref (allocator);
GST_MINI_OBJECT_CAST (mem)->dispose = gst_cuda_memory_release;
allocator->priv->outstanding++;
} else {
dec_outstanding (allocator);
}
return result;
}

View file

@ -24,6 +24,7 @@
#include <gst/video/video.h> #include <gst/video/video.h>
#include <gst/cuda/cuda-prelude.h> #include <gst/cuda/cuda-prelude.h>
#include <gst/cuda/gstcudacontext.h> #include <gst/cuda/gstcudacontext.h>
#include <gst/cuda/gstcudastream.h>
G_BEGIN_DECLS G_BEGIN_DECLS
@ -33,6 +34,7 @@ G_BEGIN_DECLS
#define GST_CUDA_ALLOCATOR_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS((obj), GST_TYPE_CUDA_ALLOCATOR,GstCudaAllocatorClass)) #define GST_CUDA_ALLOCATOR_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS((obj), GST_TYPE_CUDA_ALLOCATOR,GstCudaAllocatorClass))
#define GST_IS_CUDA_ALLOCATOR(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_CUDA_ALLOCATOR)) #define GST_IS_CUDA_ALLOCATOR(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_CUDA_ALLOCATOR))
#define GST_IS_CUDA_ALLOCATOR_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE((klass), GST_TYPE_CUDA_ALLOCATOR)) #define GST_IS_CUDA_ALLOCATOR_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE((klass), GST_TYPE_CUDA_ALLOCATOR))
/** /**
* GST_CUDA_ALLOCATOR_CAST: * GST_CUDA_ALLOCATOR_CAST:
* *
@ -46,19 +48,24 @@ G_BEGIN_DECLS
*/ */
#define GST_CUDA_MEMORY_CAST(mem) ((GstCudaMemory *) (mem)) #define GST_CUDA_MEMORY_CAST(mem) ((GstCudaMemory *) (mem))
/** #define GST_TYPE_CUDA_POOL_ALLOCATOR (gst_cuda_pool_allocator_get_type())
* GstCudaAllocator: #define GST_CUDA_POOL_ALLOCATOR(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_CUDA_POOL_ALLOCATOR,GstCudaPoolAllocator))
* #define GST_CUDA_POOL_ALLOCATOR_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass), GST_TYPE_CUDA_POOL_ALLOCATOR,GstCudaPoolAllocatorClass))
* A #GstAllocator subclass for cuda memory #define GST_CUDA_POOL_ALLOCATOR_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS((obj), GST_TYPE_CUDA_POOL_ALLOCATOR,GstCudaPoolAllocatorClass))
* #define GST_IS_CUDA_POOL_ALLOCATOR(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_CUDA_POOL_ALLOCATOR))
* Since: 1.22 #define GST_IS_CUDA_POOL_ALLOCATOR_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE((klass), GST_TYPE_CUDA_POOL_ALLOCATOR))
*/
typedef struct _GstCudaAllocator GstCudaAllocator;
typedef struct _GstCudaAllocatorClass GstCudaAllocatorClass;
typedef struct _GstCudaMemory GstCudaMemory; typedef struct _GstCudaMemory GstCudaMemory;
typedef struct _GstCudaMemoryPrivate GstCudaMemoryPrivate; typedef struct _GstCudaMemoryPrivate GstCudaMemoryPrivate;
typedef struct _GstCudaAllocator GstCudaAllocator;
typedef struct _GstCudaAllocatorClass GstCudaAllocatorClass;
typedef struct _GstCudaAllocatorPrivate GstCudaAllocatorPrivate;
typedef struct _GstCudaPoolAllocator GstCudaPoolAllocator;
typedef struct _GstCudaPoolAllocatorClass GstCudaPoolAllocatorClass;
typedef struct _GstCudaPoolAllocatorPrivate GstCudaPoolAllocatorPrivate;
/** /**
* GST_MAP_CUDA: * GST_MAP_CUDA:
* *
@ -95,17 +102,37 @@ typedef struct _GstCudaMemoryPrivate GstCudaMemoryPrivate;
/** /**
* GstCudaMemoryTransfer: * GstCudaMemoryTransfer:
* @GST_CUDA_MEMORY_TRANSFER_NEED_DOWNLOAD: the device memory needs downloading
* to the staging memory
* @GST_CUDA_MEMORY_TRANSFER_NEED_UPLOAD: the staging memory needs uploading
* to the device memory
* *
* Since: 1.22 * CUDA memory transfer flags
*/ */
typedef enum typedef enum
{ {
GST_CUDA_MEMORY_TRANSFER_NEED_DOWNLOAD = (GST_MEMORY_FLAG_LAST << 0), /**
GST_CUDA_MEMORY_TRANSFER_NEED_UPLOAD = (GST_MEMORY_FLAG_LAST << 1) * GST_CUDA_MEMORY_TRANSFER_NEED_DOWNLOAD:
*
* the device memory needs downloading to the staging memory
*
* Since: 1.22
*/
GST_CUDA_MEMORY_TRANSFER_NEED_DOWNLOAD = (GST_MEMORY_FLAG_LAST << 0),
/**
* GST_CUDA_MEMORY_TRANSFER_NEED_UPLOAD:
*
* the staging memory needs uploading to the device memory
*
* Since: 1.22
*/
GST_CUDA_MEMORY_TRANSFER_NEED_UPLOAD = (GST_MEMORY_FLAG_LAST << 1),
/**
* GST_CUDA_MEMORY_TRANSFER_NEED_SYNC:
*
* the device memory needs synchronization
*
* Since: 1.24
*/
GST_CUDA_MEMORY_TRANSFER_NEED_SYNC = (GST_MEMORY_FLAG_LAST << 2),
} GstCudaMemoryTransfer; } GstCudaMemoryTransfer;
/** /**
@ -126,29 +153,105 @@ struct _GstCudaMemory
gpointer _gst_reserved[GST_PADDING]; gpointer _gst_reserved[GST_PADDING];
}; };
GST_CUDA_API
void gst_cuda_memory_init_once (void);
GST_CUDA_API
gboolean gst_is_cuda_memory (GstMemory * mem);
GST_CUDA_API
GstCudaStream * gst_cuda_memory_get_stream (GstCudaMemory * mem);
GST_CUDA_API
void gst_cuda_memory_sync (GstCudaMemory * mem);
/**
* GstCudaAllocator:
*
* A #GstAllocator subclass for cuda memory
*
* Since: 1.22
*/
struct _GstCudaAllocator struct _GstCudaAllocator
{ {
GstAllocator parent; GstAllocator parent;
/*< private >*/
GstCudaAllocatorPrivate *priv;
gpointer _gst_reserved[GST_PADDING];
}; };
struct _GstCudaAllocatorClass struct _GstCudaAllocatorClass
{ {
GstAllocatorClass parent_class; GstAllocatorClass parent_class;
/**
* GstCudaAllocatorClass::set_active:
* @allocator: a #GstCudaAllocator
* @active: the new active state
*
* Since: 1.24
*/
gboolean (*set_active) (GstCudaAllocator * allocator,
gboolean active);
/*< private >*/
gpointer _gst_reserved[GST_PADDING_LARGE];
}; };
GST_CUDA_API GST_CUDA_API
void gst_cuda_memory_init_once (void); GType gst_cuda_allocator_get_type (void);
GST_CUDA_API GST_CUDA_API
gboolean gst_is_cuda_memory (GstMemory * mem); GstMemory * gst_cuda_allocator_alloc (GstCudaAllocator * allocator,
GstCudaContext * context,
GstCudaStream * stream,
const GstVideoInfo * info);
GST_CUDA_API GST_CUDA_API
GType gst_cuda_allocator_get_type (void); gboolean gst_cuda_allocator_set_active (GstCudaAllocator * allocator,
gboolean active);
/**
* GstCudaPoolAllocator:
*
* A #GstCudaAllocator subclass for cuda memory pool
*
* Since: 1.24
*/
struct _GstCudaPoolAllocator
{
GstCudaAllocator parent;
GstCudaContext *context;
GstCudaStream *stream;
GstVideoInfo info;
/*< private >*/
GstCudaPoolAllocatorPrivate *priv;
gpointer _gst_reserved[GST_PADDING];
};
struct _GstCudaPoolAllocatorClass
{
GstCudaAllocatorClass parent_class;
/*< private >*/
gpointer _gst_reserved[GST_PADDING];
};
GST_CUDA_API GST_CUDA_API
GstMemory * gst_cuda_allocator_alloc (GstCudaAllocator * allocator, GType gst_cuda_pool_allocator_get_type (void);
GstCudaContext * context,
const GstVideoInfo * info); GST_CUDA_API
GstCudaPoolAllocator * gst_cuda_pool_allocator_new (GstCudaContext * context,
GstCudaStream * stream,
const GstVideoInfo * info);
GST_CUDA_API
GstFlowReturn gst_cuda_pool_allocator_acquire_memory (GstCudaPoolAllocator * allocator,
GstMemory ** memory);
G_END_DECLS G_END_DECLS