fdsink/filesink: Refactor writev() code to prevent stack overflows

If buffer lists with too many buffers would be written before, a stack
overflow would happen because of memory linear with the number of
GstMemory would be allocated on the stack. This could happen for example
when filesink is configured with a very big buffer size.

Instead now move the buffer and buffer list writing into the helper
functions and at most write IOV_MAX memories at once. Anything bigger
than that wouldn't be passed to writev() anyway and written differently
in the previous code, so this also potentially speeds up writing for
these cases.

For example the following pipeline would crash with a stackoverflow:
gst-launch-1.0 audiotestsrc ! filesink buffer-size=1073741824 location=/dev/null
This commit is contained in:
Sebastian Dröge 2020-03-20 18:43:30 +02:00 committed by GStreamer Merge Bot
parent b3afd1a2fc
commit c15f1b2bec
4 changed files with 373 additions and 175 deletions

View file

@ -206,73 +206,29 @@ gst_writev (gint fd, const struct iovec *iov, gint iovcnt, gsize total_bytes)
return written;
}
static gsize
fill_vectors (struct iovec *vecs, GstMapInfo * maps, guint n, GstBuffer * buf)
static GstFlowReturn
gst_writev_iovecs (GstObject * sink, gint fd, GstPoll * fdset,
struct iovec *vecs, guint n_vecs,
guint64 * bytes_written, gint max_transient_error_timeout,
guint64 current_position, gboolean * flushing)
{
GstMemory *mem;
gsize size = 0;
guint i;
g_assert (gst_buffer_n_memory (buf) == n);
for (i = 0; i < n; ++i) {
mem = gst_buffer_peek_memory (buf, i);
if (gst_memory_map (mem, &maps[i], GST_MAP_READ)) {
vecs[i].iov_base = maps[i].data;
vecs[i].iov_len = maps[i].size;
} else {
GST_WARNING ("Failed to map memory %p for reading", mem);
vecs[i].iov_base = (void *) "";
vecs[i].iov_len = 0;
}
size += vecs[i].iov_len;
}
return size;
}
GstFlowReturn
gst_writev_buffers (GstObject * sink, gint fd, GstPoll * fdset,
GstBuffer ** buffers, guint num_buffers, guint8 * mem_nums,
guint total_mem_num, guint64 * bytes_written, guint64 skip,
gint max_transient_error_timeout, guint64 current_position,
gboolean * flushing)
{
struct iovec *vecs;
GstMapInfo *map_infos;
GstFlowReturn flow_ret;
gsize size = 0;
guint i, j;
gint64 start_time = 0;
*bytes_written = 0;
max_transient_error_timeout *= 1000;
if (max_transient_error_timeout)
start_time = g_get_monotonic_time ();
GST_LOG_OBJECT (sink, "%u buffers, %u memories", num_buffers, total_mem_num);
vecs = g_newa (struct iovec, total_mem_num);
map_infos = g_newa (GstMapInfo, total_mem_num);
/* populate output vectors */
for (i = 0, j = 0; i < num_buffers; ++i) {
size += fill_vectors (&vecs[j], &map_infos[j], mem_nums[i], buffers[i]);
j += mem_nums[i];
}
GST_LOG_OBJECT (sink, "%u iovecs", n_vecs);
/* now write it all out! */
{
gssize ret, left;
guint n_vecs = total_mem_num;
left = size;
if (skip) {
ret = skip;
errno = 0;
goto skip_first;
}
do {
if (flushing != NULL && g_atomic_int_get (flushing)) {
GST_DEBUG_OBJECT (sink, "Flushing, exiting loop");
@ -299,20 +255,16 @@ gst_writev_buffers (GstObject * sink, gint fd, GstPoll * fdset,
ret = gst_writev (fd, vecs, n_vecs, left);
if (ret > 0) {
if (bytes_written)
*bytes_written += ret;
/* Wrote something, allow the caller to update the vecs passed here */
*bytes_written = ret;
break;
}
skip_first:
if (ret == left)
break;
if (ret < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
if (errno == EAGAIN || errno == EWOULDBLOCK || ret == 0) {
/* do nothing, try again */
if (max_transient_error_timeout)
start_time = g_get_monotonic_time ();
} else if (ret < 0 && errno == EACCES && max_transient_error_timeout > 0) {
} else if (errno == EACCES && max_transient_error_timeout > 0) {
/* seek back to where we started writing and try again after sleeping
* for 10ms.
*
@ -338,31 +290,14 @@ gst_writev_buffers (GstObject * sink, gint fd, GstPoll * fdset,
* happened and we have no idea how much and if what was written
* is actually correct (it sometimes isn't)
*/
ret = lseek (fd, current_position + *bytes_written, SEEK_SET);
if (ret < 0 || ret != current_position + *bytes_written) {
ret = lseek (fd, current_position, SEEK_SET);
if (ret < 0 || ret != current_position) {
GST_ERROR_OBJECT (sink,
"failed to seek back to current write position");
goto write_error;
}
} else if (ret < 0) {
} else {
goto write_error;
} else { /* if (ret < left) */
if (max_transient_error_timeout)
start_time = g_get_monotonic_time ();
/* skip vectors that have been written in full */
while (ret >= vecs[0].iov_len) {
ret -= vecs[0].iov_len;
left -= vecs[0].iov_len;
++vecs;
--n_vecs;
}
g_assert (n_vecs > 0);
/* skip partially written vector data */
if (ret > 0) {
vecs[0].iov_len -= ret;
vecs[0].iov_base = ((guint8 *) vecs[0].iov_base) + ret;
left -= ret;
}
}
#ifdef HAVE_WIN32
/* do short sleep on windows where we don't use gst_poll(),
@ -370,7 +305,6 @@ gst_writev_buffers (GstObject * sink, gint fd, GstPoll * fdset,
if (fdset != NULL)
g_usleep (1000);
#endif
}
while (left > 0);
}
@ -379,9 +313,6 @@ gst_writev_buffers (GstObject * sink, gint fd, GstPoll * fdset,
out:
for (i = 0; i < total_mem_num; ++i)
gst_memory_unmap (map_infos[i].memory, &map_infos[i]);
return flow_ret;
/* ERRORS */
@ -417,3 +348,287 @@ write_error:
goto out;
}
}
GstFlowReturn
gst_writev_buffer (GstObject * sink, gint fd, GstPoll * fdset,
GstBuffer * buffer,
guint64 * bytes_written, guint64 skip,
gint max_transient_error_timeout, guint64 current_position,
gboolean * flushing)
{
GstFlowReturn flow_ret = GST_FLOW_OK;
struct iovec *vecs;
GstMapInfo *maps;
guint i, num_mem, num_vecs;
gsize left;
/* Buffers can contain up to 16 memories, so we can safely directly call
* writev() here without splitting up */
g_assert (gst_buffer_get_max_memory () <= GST_IOV_MAX);
num_mem = num_vecs = gst_buffer_n_memory (buffer);
GST_DEBUG ("Writing buffer %p with %u memories and %" G_GSIZE_FORMAT " bytes",
buffer, num_mem, gst_buffer_get_size (buffer));
vecs = g_newa (struct iovec, num_mem);
maps = g_newa (GstMapInfo, num_mem);
/* Map all memories */
{
GstMemory *mem;
guint i;
left = 0;
for (i = 0; i < num_mem; ++i) {
mem = gst_buffer_peek_memory (buffer, i);
if (gst_memory_map (mem, &maps[i], GST_MAP_READ)) {
vecs[i].iov_base = maps[i].data;
vecs[i].iov_len = maps[i].size;
} else {
GST_WARNING ("Failed to map memory %p for reading", mem);
vecs[i].iov_base = (void *) "";
vecs[i].iov_len = 0;
}
left += vecs[i].iov_len;
}
}
do {
guint64 bytes_written_local = 0;
flow_ret =
gst_writev_iovecs (sink, fd, fdset, vecs, num_vecs,
&bytes_written_local, max_transient_error_timeout, current_position,
flushing);
GST_DEBUG ("Wrote %" G_GUINT64_FORMAT " bytes of %" G_GSIZE_FORMAT ": %s",
bytes_written_local, left, gst_flow_get_name (flow_ret));
if (flow_ret != GST_FLOW_OK) {
g_assert (bytes_written_local == 0);
break;
}
if (bytes_written)
*bytes_written += bytes_written_local;
/* Done, no need to do bookkeeping */
if (bytes_written_local == left)
break;
/* skip vectors that have been written in full */
while (bytes_written_local >= vecs[0].iov_len) {
bytes_written_local -= vecs[0].iov_len;
left -= vecs[0].iov_len;
++vecs;
--num_vecs;
}
g_assert (num_vecs > 0);
/* skip partially written vector data */
if (bytes_written_local > 0) {
vecs[0].iov_len -= bytes_written_local;
vecs[0].iov_base = ((guint8 *) vecs[0].iov_base) + bytes_written_local;
left -= bytes_written_local;
}
} while (left > 0);
for (i = 0; i < num_mem; i++)
gst_memory_unmap (maps[i].memory, &maps[i]);
return flow_ret;
}
GstFlowReturn
gst_write_mem (GstObject * sink, gint fd, GstPoll * fdset,
const guint8 * data, guint size,
guint64 * bytes_written, guint64 skip,
gint max_transient_error_timeout, guint64 current_position,
gboolean * flushing)
{
GstFlowReturn flow_ret = GST_FLOW_OK;
struct iovec vec;
gsize left;
GST_DEBUG ("Writing memory %p with %u bytes", data, size);
vec.iov_len = size;
vec.iov_base = (guint8 *) data;
left = size;
do {
guint64 bytes_written_local = 0;
flow_ret =
gst_writev_iovecs (sink, fd, fdset, &vec, 1,
&bytes_written_local, max_transient_error_timeout, current_position,
flushing);
GST_DEBUG ("Wrote %" G_GUINT64_FORMAT " bytes of %" G_GSIZE_FORMAT ": %s",
bytes_written_local, left, gst_flow_get_name (flow_ret));
if (flow_ret != GST_FLOW_OK) {
g_assert (bytes_written_local == 0);
break;
}
if (bytes_written)
*bytes_written += bytes_written_local;
/* skip partially written vector data */
if (bytes_written_local < left) {
vec.iov_len -= bytes_written_local;
vec.iov_base = ((guint8 *) vec.iov_base) + bytes_written_local;
left -= bytes_written_local;
}
} while (left > 0);
return flow_ret;
}
GstFlowReturn
gst_writev_buffer_list (GstObject * sink, gint fd, GstPoll * fdset,
GstBufferList * buffer_list,
guint64 * bytes_written, guint64 skip,
gint max_transient_error_timeout, guint64 current_position,
gboolean * flushing)
{
GstFlowReturn flow_ret = GST_FLOW_OK;
struct iovec *vecs;
GstMapInfo *maps;
guint num_bufs, current_buf_idx = 0, current_buf_mem_idx = 0;
guint i, num_vecs;
gsize left;
num_bufs = gst_buffer_list_length (buffer_list);
num_vecs = 0;
GST_DEBUG ("Writing buffer list %p with %u buffers", buffer_list, num_bufs);
vecs = g_newa (struct iovec, GST_IOV_MAX);
maps = g_newa (GstMapInfo, GST_IOV_MAX);
/* Map the first GST_IOV_MAX memories */
{
GstBuffer *buf;
GstMemory *mem;
guint j = 0;
left = 0;
for (i = 0; i < num_bufs && num_vecs < GST_IOV_MAX; i++) {
guint num_mem;
buf = gst_buffer_list_get (buffer_list, i);
num_mem = gst_buffer_n_memory (buf);
for (j = 0; j < num_mem && num_vecs < GST_IOV_MAX; j++) {
mem = gst_buffer_peek_memory (buf, j);
if (gst_memory_map (mem, &maps[num_vecs], GST_MAP_READ)) {
vecs[num_vecs].iov_base = maps[num_vecs].data;
vecs[num_vecs].iov_len = maps[num_vecs].size;
} else {
GST_WARNING ("Failed to map memory %p for reading", mem);
vecs[num_vecs].iov_base = (void *) "";
vecs[num_vecs].iov_len = 0;
}
left += vecs[num_vecs].iov_len;
num_vecs++;
}
}
current_buf_idx = i;
current_buf_mem_idx = j;
}
do {
guint64 bytes_written_local = 0;
guint vecs_written = 0;
flow_ret =
gst_writev_iovecs (sink, fd, fdset, vecs, num_vecs,
&bytes_written_local, max_transient_error_timeout, current_position,
flushing);
GST_DEBUG ("Wrote %" G_GUINT64_FORMAT " bytes of %" G_GSIZE_FORMAT ": %s",
bytes_written_local, left, gst_flow_get_name (flow_ret));
if (flow_ret != GST_FLOW_OK) {
g_assert (bytes_written_local == 0);
break;
}
if (flow_ret != GST_FLOW_OK) {
g_assert (bytes_written_local == 0);
break;
}
if (bytes_written)
*bytes_written += bytes_written_local;
/* All done, no need for bookkeeping */
if (bytes_written_local == left && current_buf_idx == num_bufs)
break;
/* skip vectors that have been written in full */
while (vecs_written < num_vecs
&& bytes_written_local >= vecs[vecs_written].iov_len) {
bytes_written_local -= vecs[vecs_written].iov_len;
left -= vecs[vecs_written].iov_len;
vecs_written++;
}
g_assert (vecs_written < num_vecs || bytes_written_local == 0);
/* skip partially written vector data */
if (bytes_written_local > 0) {
vecs[vecs_written].iov_len -= bytes_written_local;
vecs[vecs_written].iov_base =
((guint8 *) vecs[0].iov_base) + bytes_written_local;
left -= bytes_written_local;
}
/* If we have buffers left, fill them in now */
if (current_buf_idx < num_bufs) {
GstBuffer *buf;
GstMemory *mem;
guint j = current_buf_mem_idx;
/* Unmap the first vecs_written memories now */
for (i = 0; i < vecs_written; i++)
gst_memory_unmap (maps[i].memory, &maps[i]);
/* Move upper remaining vecs and maps back to the beginning */
memmove (vecs, &vecs[vecs_written],
(num_vecs - vecs_written) * sizeof (vecs[0]));
memmove (maps, &maps[vecs_written],
(num_vecs - vecs_written) * sizeof (maps[0]));
num_vecs -= vecs_written;
/* And finally refill */
for (i = current_buf_idx; i < num_bufs && num_vecs < GST_IOV_MAX; i++) {
guint num_mem;
buf = gst_buffer_list_get (buffer_list, i);
num_mem = gst_buffer_n_memory (buf);
for (j = current_buf_mem_idx; j < num_mem && num_vecs < GST_IOV_MAX;
j++) {
mem = gst_buffer_peek_memory (buf, j);
if (gst_memory_map (mem, &maps[num_vecs], GST_MAP_READ)) {
vecs[num_vecs].iov_base = maps[num_vecs].data;
vecs[num_vecs].iov_len = maps[num_vecs].size;
} else {
GST_WARNING ("Failed to map memory %p for reading", mem);
vecs[num_vecs].iov_base = (void *) "";
vecs[num_vecs].iov_len = 0;
}
left += vecs[num_vecs].iov_len;
num_vecs++;
}
}
current_buf_idx = i;
current_buf_mem_idx = j;
}
} while (left > 0);
for (i = 0; i < num_vecs; i++)
gst_memory_unmap (maps[i].memory, &maps[i]);
return flow_ret;
}

View file

@ -34,12 +34,25 @@ G_GNUC_INTERNAL
gchar * gst_buffer_get_meta_string (GstBuffer * buffer);
G_GNUC_INTERNAL
GstFlowReturn gst_writev_buffers (GstObject * sink, gint fd, GstPoll * fdset,
GstBuffer ** buffers, guint num_buffers,
guint8 * mem_nums, guint total_mem_num,
guint64 * bytes_written, guint64 skip,
gint max_transient_error_timeout, guint64 current_position,
gboolean * flushing);
GstFlowReturn gst_writev_buffer (GstObject * sink, gint fd, GstPoll * fdset,
GstBuffer * buffer,
guint64 * bytes_written, guint64 skip,
gint max_transient_error_timeout, guint64 current_position,
gboolean * flushing);
G_GNUC_INTERNAL
GstFlowReturn gst_writev_buffer_list (GstObject * sink, gint fd, GstPoll * fdset,
GstBufferList * buffer_list,
guint64 * bytes_written, guint64 skip,
gint max_transient_error_timeout, guint64 current_position,
gboolean * flushing);
G_GNUC_INTERNAL
GstFlowReturn gst_write_mem (GstObject * sink, gint fd, GstPoll * fdset,
const guint8 *data, guint size,
guint64 * bytes_written, guint64 skip,
gint max_transient_error_timeout, guint64 current_position,
gboolean * flushing);
G_END_DECLS

View file

@ -243,18 +243,24 @@ gst_fd_sink_query (GstBaseSink * bsink, GstQuery * query)
}
static GstFlowReturn
gst_fd_sink_render_buffers (GstFdSink * sink, GstBuffer ** buffers,
guint num_buffers, guint8 * mem_nums, guint total_mems)
gst_fd_sink_render_list (GstBaseSink * bsink, GstBufferList * buffer_list)
{
GstFdSink *sink;
GstFlowReturn ret;
guint64 skip = 0;
guint num_buffers;
sink = GST_FD_SINK_CAST (bsink);
num_buffers = gst_buffer_list_length (buffer_list);
if (num_buffers == 0)
goto no_data;
for (;;) {
guint64 bytes_written = 0;
ret = gst_writev_buffers (GST_OBJECT_CAST (sink), sink->fd, sink->fdset,
buffers, num_buffers, mem_nums, total_mems, &bytes_written, skip,
0, -1, NULL);
ret = gst_writev_buffer_list (GST_OBJECT_CAST (sink), sink->fd, sink->fdset,
buffer_list, &bytes_written, skip, 0, -1, NULL);
sink->bytes_written += bytes_written;
sink->current_pos += bytes_written;
@ -269,38 +275,6 @@ gst_fd_sink_render_buffers (GstFdSink * sink, GstBuffer ** buffers,
}
return ret;
}
static GstFlowReturn
gst_fd_sink_render_list (GstBaseSink * bsink, GstBufferList * buffer_list)
{
GstFlowReturn flow;
GstBuffer **buffers;
GstFdSink *sink;
guint8 *mem_nums;
guint total_mems;
guint i, num_buffers;
sink = GST_FD_SINK_CAST (bsink);
num_buffers = gst_buffer_list_length (buffer_list);
if (num_buffers == 0)
goto no_data;
/* extract buffers from list and count memories */
buffers = g_newa (GstBuffer *, num_buffers);
mem_nums = g_newa (guint8, num_buffers);
for (i = 0, total_mems = 0; i < num_buffers; ++i) {
buffers[i] = gst_buffer_list_get (buffer_list, i);
mem_nums[i] = gst_buffer_n_memory (buffers[i]);
total_mems += mem_nums[i];
}
flow =
gst_fd_sink_render_buffers (sink, buffers, num_buffers, mem_nums,
total_mems);
return flow;
no_data:
{
@ -312,20 +286,31 @@ no_data:
static GstFlowReturn
gst_fd_sink_render (GstBaseSink * bsink, GstBuffer * buffer)
{
GstFlowReturn flow;
GstFdSink *sink;
guint8 n_mem;
GstFlowReturn ret;
guint64 skip = 0;
sink = GST_FD_SINK_CAST (bsink);
n_mem = gst_buffer_n_memory (buffer);
for (;;) {
guint64 bytes_written = 0;
if (n_mem > 0)
flow = gst_fd_sink_render_buffers (sink, &buffer, 1, &n_mem, n_mem);
else
flow = GST_FLOW_OK;
ret = gst_writev_buffer (GST_OBJECT_CAST (sink), sink->fd, sink->fdset,
buffer, &bytes_written, skip, 0, -1, NULL);
return flow;
sink->bytes_written += bytes_written;
sink->current_pos += bytes_written;
skip += bytes_written;
if (!sink->unlock)
break;
ret = gst_base_sink_wait_preroll (GST_BASE_SINK (sink));
if (ret != GST_FLOW_OK)
return ret;
}
return ret;
}
static gboolean

View file

@ -699,55 +699,28 @@ gst_file_sink_get_current_offset (GstFileSink * filesink, guint64 * p_pos)
return (ret != (off_t) - 1);
}
static GstFlowReturn
gst_file_sink_render_buffers (GstFileSink * sink, GstBuffer ** buffers,
guint num_buffers, guint8 * mem_nums, guint total_mems, gsize size)
{
GstFlowReturn ret;
guint64 bytes_written = 0;
GST_DEBUG_OBJECT (sink,
"writing %u buffers (%u memories, %" G_GSIZE_FORMAT
" bytes) at position %" G_GUINT64_FORMAT, num_buffers, total_mems, size,
sink->current_pos);
ret = gst_writev_buffers (GST_OBJECT_CAST (sink), fileno (sink->file), NULL,
buffers, num_buffers, mem_nums, total_mems, &bytes_written, 0,
sink->max_transient_error_timeout, sink->current_pos, &sink->flushing);
sink->current_pos += bytes_written;
return ret;
}
static GstFlowReturn
gst_file_sink_render_list_internal (GstFileSink * sink,
GstBufferList * buffer_list)
{
GstFlowReturn flow;
GstBuffer **buffers;
guint8 *mem_nums;
guint total_mems;
gsize total_size = 0;
guint i, num_buffers;
guint64 bytes_written = 0;
guint num_buffers;
num_buffers = gst_buffer_list_length (buffer_list);
if (num_buffers == 0)
goto no_data;
/* extract buffers from list and count memories */
buffers = g_newa (GstBuffer *, num_buffers);
mem_nums = g_newa (guint8, num_buffers);
for (i = 0, total_mems = 0; i < num_buffers; ++i) {
buffers[i] = gst_buffer_list_get (buffer_list, i);
mem_nums[i] = gst_buffer_n_memory (buffers[i]);
total_mems += mem_nums[i];
total_size += gst_buffer_get_size (buffers[i]);
}
GST_DEBUG_OBJECT (sink,
"writing %u buffers at position %" G_GUINT64_FORMAT, num_buffers,
sink->current_pos);
flow =
gst_file_sink_render_buffers (sink, buffers, num_buffers, mem_nums,
total_mems, total_size);
gst_writev_buffer_list (GST_OBJECT_CAST (sink), fileno (sink->file), NULL,
buffer_list, &bytes_written, 0, sink->max_transient_error_timeout,
sink->current_pos, &sink->flushing);
sink->current_pos += bytes_written;
return flow;
@ -885,10 +858,22 @@ gst_file_sink_render (GstBaseSink * sink, GstBuffer * buffer)
if (n_mem > 0 && (sync_after || !filesink->buffer)) {
flow = gst_file_sink_flush_buffer (filesink);
if (flow == GST_FLOW_OK)
if (flow == GST_FLOW_OK) {
guint64 bytes_written = 0;
GST_DEBUG_OBJECT (sink,
"writing buffer ( %" G_GSIZE_FORMAT
" bytes) at position %" G_GUINT64_FORMAT,
gst_buffer_get_size (buffer), filesink->current_pos);
flow =
gst_file_sink_render_buffers (filesink, &buffer, 1, &n_mem, n_mem,
gst_buffer_get_size (buffer));
gst_writev_buffer (GST_OBJECT_CAST (filesink),
fileno (filesink->file), NULL, buffer, &bytes_written, 0,
filesink->max_transient_error_timeout, filesink->current_pos,
&filesink->flushing);
filesink->current_pos += bytes_written;
}
} else if (n_mem > 0) {
GST_DEBUG_OBJECT (filesink,
"Queueing buffer of %" G_GSIZE_FORMAT " bytes at offset %"