From 66bdae5f33292dd73e63fbfce639ac1dbd311a3b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tim-Philipp=20M=C3=BCller?= Date: Fri, 28 Nov 2014 14:38:30 +0000 Subject: [PATCH] plugins: add helper function for writing buffers out with writev() --- configure.ac | 3 + plugins/elements/gstelements_private.c | 241 +++++++++++++++++++++++++ plugins/elements/gstelements_private.h | 6 + 3 files changed, 250 insertions(+) diff --git a/configure.ac b/configure.ac index 1e7c88d393..ab50a3ad76 100644 --- a/configure.ac +++ b/configure.ac @@ -344,6 +344,9 @@ AM_CONDITIONAL(HAVE_PTHREAD, test "x$HAVE_PTHREAD" = "xyes") dnl check for sys/prctl for setting thread name on Linux AC_CHECK_HEADERS([sys/prctl.h], [], [], [AC_INCLUDES_DEFAULT]) +dnl check for sys/uio.h for writev() +AC_CHECK_HEADERS([sys/uio.h], [], [], [AC_INCLUDES_DEFAULT]) + dnl Check for valgrind.h dnl separate from HAVE_VALGRIND because you can have the program, but not dnl the dev package diff --git a/plugins/elements/gstelements_private.c b/plugins/elements/gstelements_private.c index f539a8554a..ff2a3a12c7 100644 --- a/plugins/elements/gstelements_private.c +++ b/plugins/elements/gstelements_private.c @@ -25,6 +25,15 @@ #ifdef HAVE_CONFIG_H # include "config.h" #endif +#include +#ifdef HAVE_UNISTD_H +#include +#endif +#ifdef HAVE_SYS_UIO_H +#include +#endif +#include +#include #include #include "gst/gst.h" #include "gstelements_private.h" @@ -65,3 +74,235 @@ gst_buffer_get_flags_string (GstBuffer * buffer) return flag_str; } + +/* Define our own iovec structure here, so that we can use it unconditionally + * in the code below and use almost the same code path for systems where + * writev() is supported and those were it's not supported */ +#ifndef HAVE_SYS_UIO_H +struct iovec +{ + gpointer iov_base; + gsize iov_len; +}; +#endif + +/* completely arbitrary thresholds */ +#define FDSINK_MAX_ALLOCA_SIZE (64 * 1024) /* 64k */ +#define FDSINK_MAX_MALLOC_SIZE ( 8 * 1024 * 1024) /* 8M */ + +static gssize +gst_writev (gint fd, const struct iovec *iov, gint iovcnt, gsize total_bytes) +{ + gssize written; + +#ifdef HAVE_SYS_UIO_H + if (TRUE) { + do { + written = writev (fd, iov, iovcnt); + } while (written < 0 && errno == EINTR); + } else +#endif + { + gint i; + + /* We merge the memories here because technically write()/writev() is + * supposed to be atomic, which it's not if we do multiple separate + * write() calls. It's very doubtful anyone cares though in our use + * cases, and it's not clear how that can be reconciled with the + * possibility of short writes, so in any case we might want to + * simplify this later or just remove it. */ + if (total_bytes <= FDSINK_MAX_MALLOC_SIZE) { + gchar *mem, *p; + + if (total_bytes <= FDSINK_MAX_ALLOCA_SIZE) + mem = g_alloca (total_bytes); + else + mem = g_malloc (total_bytes); + + p = mem; + for (i = 0; i < iovcnt; ++i) { + memcpy (p, iov[i].iov_base, iov[i].iov_len); + p += iov[i].iov_len; + } + + do { + written = write (fd, mem, total_bytes); + } while (written < 0 && errno == EINTR); + + if (total_bytes > FDSINK_MAX_ALLOCA_SIZE) + g_free (mem); + } else { + gssize ret; + + written = 0; + for (i = 0; i < iovcnt; ++i) { + do { + ret = write (fd, iov[i].iov_base, iov[i].iov_len); + } while (ret < 0 && errno == EINTR); + if (ret > 0) + written += ret; + if (ret != iov[i].iov_len) + break; + } + } + } + + return written; +} + +static gsize +fill_vectors (struct iovec *vecs, GstMapInfo * maps, guint n, GstBuffer * buf) +{ + GstMemory *mem; + gsize size = 0; + guint i; + + g_assert (gst_buffer_n_memory (buf) == n); + + for (i = 0; i < n; ++i) { + mem = gst_buffer_peek_memory (buf, i); + if (gst_memory_map (mem, &maps[i], GST_MAP_READ)) { + vecs[i].iov_base = maps[i].data; + vecs[i].iov_len = maps[i].size; + } else { + GST_WARNING ("Failed to map memory %p for reading", mem); + vecs[i].iov_base = (void *) ""; + vecs[i].iov_len = 0; + } + size += vecs[i].iov_len; + } + + return size; +} + +GstFlowReturn +gst_writev_buffers (GstObject * sink, gint fd, GstPoll * fdset, + GstBuffer ** buffers, guint num_buffers, guint8 * mem_nums, + guint total_mem_num, guint64 * total_written, guint64 * cur_pos) +{ + struct iovec *vecs; + GstMapInfo *map_infos; + GstFlowReturn flow_ret; + gsize size = 0; + guint i, j; + + GST_LOG_OBJECT (sink, "%u buffers, %u memories", num_buffers, total_mem_num); + + vecs = g_newa (struct iovec, total_mem_num); + map_infos = g_newa (GstMapInfo, total_mem_num); + + /* populate output vectors */ + for (i = 0, j = 0; i < num_buffers; ++i) { + size += fill_vectors (&vecs[j], &map_infos[j], mem_nums[i], buffers[i]); + j += mem_nums[i]; + } + + /* now write it all out! */ + { + gssize ret, left; + guint n_vecs = total_mem_num; + + left = size; + do { +#ifndef HAVE_WIN32 + if (fdset != NULL) { + do { + GST_DEBUG_OBJECT (sink, "going into select, have %" G_GSSIZE_FORMAT + " bytes to write", left); + ret = gst_poll_wait (fdset, GST_CLOCK_TIME_NONE); + } while (ret == -1 && (errno == EINTR || errno == EAGAIN)); + + if (ret == -1) { + if (errno == EBUSY) + goto stopped; + else + goto select_error; + } + } +#endif + + ret = gst_writev (fd, vecs, n_vecs, left); + + if (ret > 0) { + if (total_written) + *total_written += ret; + if (cur_pos) + *cur_pos += ret; + } + + if (ret == left) + break; + + if (ret < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) { + /* do nothing, try again */ + } else if (ret < 0) { + goto write_error; + } else if (ret < left) { + /* skip vectors that have been written in full */ + while (ret >= vecs[0].iov_len) { + ret -= vecs[0].iov_len; + left -= vecs[0].iov_len; + ++vecs; + --n_vecs; + } + g_assert (n_vecs > 0); + /* skip partially written vector data */ + if (ret > 0) { + vecs[0].iov_len -= ret; + vecs[0].iov_base = ((guint8 *) vecs[0].iov_base) + ret; + left -= ret; + } + } +#ifdef HAVE_WIN32 + /* do short sleep on windows where we don't use gst_poll(), + * to avoid excessive busy looping */ + if (fdset != NULL) + g_usleep (1000); +#endif + + } + while (left > 0); + } + + flow_ret = GST_FLOW_OK; + +out: + + for (i = 0; i < total_mem_num; ++i) + gst_memory_unmap (map_infos[i].memory, &map_infos[i]); + + return flow_ret; + +/* ERRORS */ +#ifndef HAVE_WIN32 +select_error: + { + GST_ELEMENT_ERROR (sink, RESOURCE, READ, (NULL), + ("select on file descriptor: %s", g_strerror (errno))); + GST_DEBUG_OBJECT (sink, "Error during select: %s", g_strerror (errno)); + flow_ret = GST_FLOW_ERROR; + goto out; + } +stopped: + { + GST_DEBUG_OBJECT (sink, "Select stopped"); + flow_ret = GST_FLOW_FLUSHING; + goto out; + } +#endif +write_error: + { + switch (errno) { + case ENOSPC: + GST_ELEMENT_ERROR (sink, RESOURCE, NO_SPACE_LEFT, (NULL), (NULL)); + break; + default:{ + GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, (NULL), + ("Error while writing to file descriptor %d: %s", + fd, g_strerror (errno))); + } + } + flow_ret = GST_FLOW_ERROR; + goto out; + } +} diff --git a/plugins/elements/gstelements_private.h b/plugins/elements/gstelements_private.h index 9ccae1b8e2..469b3b83c2 100644 --- a/plugins/elements/gstelements_private.h +++ b/plugins/elements/gstelements_private.h @@ -30,6 +30,12 @@ G_BEGIN_DECLS G_GNUC_INTERNAL char * gst_buffer_get_flags_string (GstBuffer *buffer); +G_GNUC_INTERNAL +GstFlowReturn gst_writev_buffers (GstObject * sink, gint fd, GstPoll * fdset, + GstBuffer ** buffers, guint num_buffers, + guint8 * mem_nums, guint total_mem_num, + guint64 * total_written, guint64 * cur_pos); + G_END_DECLS #endif /* __GST_ELEMENTS_PRIVATE_H__ */