diff --git a/plugins/elements/gstelements_private.c b/plugins/elements/gstelements_private.c index 10955514db..b9a5517dc5 100644 --- a/plugins/elements/gstelements_private.c +++ b/plugins/elements/gstelements_private.c @@ -206,73 +206,29 @@ gst_writev (gint fd, const struct iovec *iov, gint iovcnt, gsize total_bytes) return written; } -static gsize -fill_vectors (struct iovec *vecs, GstMapInfo * maps, guint n, GstBuffer * buf) +static GstFlowReturn +gst_writev_iovecs (GstObject * sink, gint fd, GstPoll * fdset, + struct iovec *vecs, guint n_vecs, + guint64 * bytes_written, gint max_transient_error_timeout, + guint64 current_position, gboolean * flushing) { - GstMemory *mem; - gsize size = 0; - guint i; - - g_assert (gst_buffer_n_memory (buf) == n); - - for (i = 0; i < n; ++i) { - mem = gst_buffer_peek_memory (buf, i); - if (gst_memory_map (mem, &maps[i], GST_MAP_READ)) { - vecs[i].iov_base = maps[i].data; - vecs[i].iov_len = maps[i].size; - } else { - GST_WARNING ("Failed to map memory %p for reading", mem); - vecs[i].iov_base = (void *) ""; - vecs[i].iov_len = 0; - } - size += vecs[i].iov_len; - } - - return size; -} - -GstFlowReturn -gst_writev_buffers (GstObject * sink, gint fd, GstPoll * fdset, - GstBuffer ** buffers, guint num_buffers, guint8 * mem_nums, - guint total_mem_num, guint64 * bytes_written, guint64 skip, - gint max_transient_error_timeout, guint64 current_position, - gboolean * flushing) -{ - struct iovec *vecs; - GstMapInfo *map_infos; GstFlowReturn flow_ret; gsize size = 0; - guint i, j; gint64 start_time = 0; + *bytes_written = 0; max_transient_error_timeout *= 1000; if (max_transient_error_timeout) start_time = g_get_monotonic_time (); - GST_LOG_OBJECT (sink, "%u buffers, %u memories", num_buffers, total_mem_num); - - vecs = g_newa (struct iovec, total_mem_num); - map_infos = g_newa (GstMapInfo, total_mem_num); - - /* populate output vectors */ - for (i = 0, j = 0; i < num_buffers; ++i) { - size += fill_vectors (&vecs[j], &map_infos[j], mem_nums[i], buffers[i]); - j += mem_nums[i]; - } + GST_LOG_OBJECT (sink, "%u iovecs", n_vecs); /* now write it all out! */ { gssize ret, left; - guint n_vecs = total_mem_num; left = size; - if (skip) { - ret = skip; - errno = 0; - goto skip_first; - } - do { if (flushing != NULL && g_atomic_int_get (flushing)) { GST_DEBUG_OBJECT (sink, "Flushing, exiting loop"); @@ -299,20 +255,16 @@ gst_writev_buffers (GstObject * sink, gint fd, GstPoll * fdset, ret = gst_writev (fd, vecs, n_vecs, left); if (ret > 0) { - if (bytes_written) - *bytes_written += ret; + /* Wrote something, allow the caller to update the vecs passed here */ + *bytes_written = ret; + break; } - skip_first: - - if (ret == left) - break; - - if (ret < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) { + if (errno == EAGAIN || errno == EWOULDBLOCK || ret == 0) { /* do nothing, try again */ if (max_transient_error_timeout) start_time = g_get_monotonic_time (); - } else if (ret < 0 && errno == EACCES && max_transient_error_timeout > 0) { + } else if (errno == EACCES && max_transient_error_timeout > 0) { /* seek back to where we started writing and try again after sleeping * for 10ms. * @@ -338,31 +290,14 @@ gst_writev_buffers (GstObject * sink, gint fd, GstPoll * fdset, * happened and we have no idea how much and if what was written * is actually correct (it sometimes isn't) */ - ret = lseek (fd, current_position + *bytes_written, SEEK_SET); - if (ret < 0 || ret != current_position + *bytes_written) { + ret = lseek (fd, current_position, SEEK_SET); + if (ret < 0 || ret != current_position) { GST_ERROR_OBJECT (sink, "failed to seek back to current write position"); goto write_error; } - } else if (ret < 0) { + } else { goto write_error; - } else { /* if (ret < left) */ - if (max_transient_error_timeout) - start_time = g_get_monotonic_time (); - /* skip vectors that have been written in full */ - while (ret >= vecs[0].iov_len) { - ret -= vecs[0].iov_len; - left -= vecs[0].iov_len; - ++vecs; - --n_vecs; - } - g_assert (n_vecs > 0); - /* skip partially written vector data */ - if (ret > 0) { - vecs[0].iov_len -= ret; - vecs[0].iov_base = ((guint8 *) vecs[0].iov_base) + ret; - left -= ret; - } } #ifdef HAVE_WIN32 /* do short sleep on windows where we don't use gst_poll(), @@ -370,7 +305,6 @@ gst_writev_buffers (GstObject * sink, gint fd, GstPoll * fdset, if (fdset != NULL) g_usleep (1000); #endif - } while (left > 0); } @@ -379,9 +313,6 @@ gst_writev_buffers (GstObject * sink, gint fd, GstPoll * fdset, out: - for (i = 0; i < total_mem_num; ++i) - gst_memory_unmap (map_infos[i].memory, &map_infos[i]); - return flow_ret; /* ERRORS */ @@ -417,3 +348,287 @@ write_error: goto out; } } + +GstFlowReturn +gst_writev_buffer (GstObject * sink, gint fd, GstPoll * fdset, + GstBuffer * buffer, + guint64 * bytes_written, guint64 skip, + gint max_transient_error_timeout, guint64 current_position, + gboolean * flushing) +{ + GstFlowReturn flow_ret = GST_FLOW_OK; + struct iovec *vecs; + GstMapInfo *maps; + guint i, num_mem, num_vecs; + gsize left; + + /* Buffers can contain up to 16 memories, so we can safely directly call + * writev() here without splitting up */ + g_assert (gst_buffer_get_max_memory () <= GST_IOV_MAX); + + num_mem = num_vecs = gst_buffer_n_memory (buffer); + + GST_DEBUG ("Writing buffer %p with %u memories and %" G_GSIZE_FORMAT " bytes", + buffer, num_mem, gst_buffer_get_size (buffer)); + + vecs = g_newa (struct iovec, num_mem); + maps = g_newa (GstMapInfo, num_mem); + + /* Map all memories */ + { + GstMemory *mem; + guint i; + + left = 0; + for (i = 0; i < num_mem; ++i) { + mem = gst_buffer_peek_memory (buffer, i); + if (gst_memory_map (mem, &maps[i], GST_MAP_READ)) { + vecs[i].iov_base = maps[i].data; + vecs[i].iov_len = maps[i].size; + } else { + GST_WARNING ("Failed to map memory %p for reading", mem); + vecs[i].iov_base = (void *) ""; + vecs[i].iov_len = 0; + } + left += vecs[i].iov_len; + } + } + + do { + guint64 bytes_written_local = 0; + + flow_ret = + gst_writev_iovecs (sink, fd, fdset, vecs, num_vecs, + &bytes_written_local, max_transient_error_timeout, current_position, + flushing); + + GST_DEBUG ("Wrote %" G_GUINT64_FORMAT " bytes of %" G_GSIZE_FORMAT ": %s", + bytes_written_local, left, gst_flow_get_name (flow_ret)); + + if (flow_ret != GST_FLOW_OK) { + g_assert (bytes_written_local == 0); + break; + } + + if (bytes_written) + *bytes_written += bytes_written_local; + + /* Done, no need to do bookkeeping */ + if (bytes_written_local == left) + break; + + /* skip vectors that have been written in full */ + while (bytes_written_local >= vecs[0].iov_len) { + bytes_written_local -= vecs[0].iov_len; + left -= vecs[0].iov_len; + ++vecs; + --num_vecs; + } + g_assert (num_vecs > 0); + /* skip partially written vector data */ + if (bytes_written_local > 0) { + vecs[0].iov_len -= bytes_written_local; + vecs[0].iov_base = ((guint8 *) vecs[0].iov_base) + bytes_written_local; + left -= bytes_written_local; + } + } while (left > 0); + + for (i = 0; i < num_mem; i++) + gst_memory_unmap (maps[i].memory, &maps[i]); + + return flow_ret; +} + +GstFlowReturn +gst_write_mem (GstObject * sink, gint fd, GstPoll * fdset, + const guint8 * data, guint size, + guint64 * bytes_written, guint64 skip, + gint max_transient_error_timeout, guint64 current_position, + gboolean * flushing) +{ + GstFlowReturn flow_ret = GST_FLOW_OK; + struct iovec vec; + gsize left; + + GST_DEBUG ("Writing memory %p with %u bytes", data, size); + + vec.iov_len = size; + vec.iov_base = (guint8 *) data; + left = size; + + do { + guint64 bytes_written_local = 0; + + flow_ret = + gst_writev_iovecs (sink, fd, fdset, &vec, 1, + &bytes_written_local, max_transient_error_timeout, current_position, + flushing); + + GST_DEBUG ("Wrote %" G_GUINT64_FORMAT " bytes of %" G_GSIZE_FORMAT ": %s", + bytes_written_local, left, gst_flow_get_name (flow_ret)); + + if (flow_ret != GST_FLOW_OK) { + g_assert (bytes_written_local == 0); + break; + } + + if (bytes_written) + *bytes_written += bytes_written_local; + + /* skip partially written vector data */ + if (bytes_written_local < left) { + vec.iov_len -= bytes_written_local; + vec.iov_base = ((guint8 *) vec.iov_base) + bytes_written_local; + left -= bytes_written_local; + } + } while (left > 0); + + return flow_ret; +} + +GstFlowReturn +gst_writev_buffer_list (GstObject * sink, gint fd, GstPoll * fdset, + GstBufferList * buffer_list, + guint64 * bytes_written, guint64 skip, + gint max_transient_error_timeout, guint64 current_position, + gboolean * flushing) +{ + GstFlowReturn flow_ret = GST_FLOW_OK; + struct iovec *vecs; + GstMapInfo *maps; + guint num_bufs, current_buf_idx = 0, current_buf_mem_idx = 0; + guint i, num_vecs; + gsize left; + + num_bufs = gst_buffer_list_length (buffer_list); + num_vecs = 0; + + GST_DEBUG ("Writing buffer list %p with %u buffers", buffer_list, num_bufs); + + vecs = g_newa (struct iovec, GST_IOV_MAX); + maps = g_newa (GstMapInfo, GST_IOV_MAX); + + /* Map the first GST_IOV_MAX memories */ + { + GstBuffer *buf; + GstMemory *mem; + guint j = 0; + + left = 0; + for (i = 0; i < num_bufs && num_vecs < GST_IOV_MAX; i++) { + guint num_mem; + + buf = gst_buffer_list_get (buffer_list, i); + num_mem = gst_buffer_n_memory (buf); + + for (j = 0; j < num_mem && num_vecs < GST_IOV_MAX; j++) { + mem = gst_buffer_peek_memory (buf, j); + if (gst_memory_map (mem, &maps[num_vecs], GST_MAP_READ)) { + vecs[num_vecs].iov_base = maps[num_vecs].data; + vecs[num_vecs].iov_len = maps[num_vecs].size; + } else { + GST_WARNING ("Failed to map memory %p for reading", mem); + vecs[num_vecs].iov_base = (void *) ""; + vecs[num_vecs].iov_len = 0; + } + left += vecs[num_vecs].iov_len; + num_vecs++; + } + } + current_buf_idx = i; + current_buf_mem_idx = j; + } + + do { + guint64 bytes_written_local = 0; + guint vecs_written = 0; + + flow_ret = + gst_writev_iovecs (sink, fd, fdset, vecs, num_vecs, + &bytes_written_local, max_transient_error_timeout, current_position, + flushing); + + GST_DEBUG ("Wrote %" G_GUINT64_FORMAT " bytes of %" G_GSIZE_FORMAT ": %s", + bytes_written_local, left, gst_flow_get_name (flow_ret)); + + if (flow_ret != GST_FLOW_OK) { + g_assert (bytes_written_local == 0); + break; + } + + if (flow_ret != GST_FLOW_OK) { + g_assert (bytes_written_local == 0); + break; + } + + if (bytes_written) + *bytes_written += bytes_written_local; + + /* All done, no need for bookkeeping */ + if (bytes_written_local == left && current_buf_idx == num_bufs) + break; + + /* skip vectors that have been written in full */ + while (vecs_written < num_vecs + && bytes_written_local >= vecs[vecs_written].iov_len) { + bytes_written_local -= vecs[vecs_written].iov_len; + left -= vecs[vecs_written].iov_len; + vecs_written++; + } + g_assert (vecs_written < num_vecs || bytes_written_local == 0); + /* skip partially written vector data */ + if (bytes_written_local > 0) { + vecs[vecs_written].iov_len -= bytes_written_local; + vecs[vecs_written].iov_base = + ((guint8 *) vecs[0].iov_base) + bytes_written_local; + left -= bytes_written_local; + } + + /* If we have buffers left, fill them in now */ + if (current_buf_idx < num_bufs) { + GstBuffer *buf; + GstMemory *mem; + guint j = current_buf_mem_idx; + + /* Unmap the first vecs_written memories now */ + for (i = 0; i < vecs_written; i++) + gst_memory_unmap (maps[i].memory, &maps[i]); + /* Move upper remaining vecs and maps back to the beginning */ + memmove (vecs, &vecs[vecs_written], + (num_vecs - vecs_written) * sizeof (vecs[0])); + memmove (maps, &maps[vecs_written], + (num_vecs - vecs_written) * sizeof (maps[0])); + num_vecs -= vecs_written; + + /* And finally refill */ + for (i = current_buf_idx; i < num_bufs && num_vecs < GST_IOV_MAX; i++) { + guint num_mem; + + buf = gst_buffer_list_get (buffer_list, i); + num_mem = gst_buffer_n_memory (buf); + + for (j = current_buf_mem_idx; j < num_mem && num_vecs < GST_IOV_MAX; + j++) { + mem = gst_buffer_peek_memory (buf, j); + if (gst_memory_map (mem, &maps[num_vecs], GST_MAP_READ)) { + vecs[num_vecs].iov_base = maps[num_vecs].data; + vecs[num_vecs].iov_len = maps[num_vecs].size; + } else { + GST_WARNING ("Failed to map memory %p for reading", mem); + vecs[num_vecs].iov_base = (void *) ""; + vecs[num_vecs].iov_len = 0; + } + left += vecs[num_vecs].iov_len; + num_vecs++; + } + } + current_buf_idx = i; + current_buf_mem_idx = j; + } + } while (left > 0); + + for (i = 0; i < num_vecs; i++) + gst_memory_unmap (maps[i].memory, &maps[i]); + + return flow_ret; +} diff --git a/plugins/elements/gstelements_private.h b/plugins/elements/gstelements_private.h index 3dcb541ccd..98f694257f 100644 --- a/plugins/elements/gstelements_private.h +++ b/plugins/elements/gstelements_private.h @@ -34,12 +34,25 @@ G_GNUC_INTERNAL gchar * gst_buffer_get_meta_string (GstBuffer * buffer); G_GNUC_INTERNAL -GstFlowReturn gst_writev_buffers (GstObject * sink, gint fd, GstPoll * fdset, - GstBuffer ** buffers, guint num_buffers, - guint8 * mem_nums, guint total_mem_num, - guint64 * bytes_written, guint64 skip, - gint max_transient_error_timeout, guint64 current_position, - gboolean * flushing); +GstFlowReturn gst_writev_buffer (GstObject * sink, gint fd, GstPoll * fdset, + GstBuffer * buffer, + guint64 * bytes_written, guint64 skip, + gint max_transient_error_timeout, guint64 current_position, + gboolean * flushing); + +G_GNUC_INTERNAL +GstFlowReturn gst_writev_buffer_list (GstObject * sink, gint fd, GstPoll * fdset, + GstBufferList * buffer_list, + guint64 * bytes_written, guint64 skip, + gint max_transient_error_timeout, guint64 current_position, + gboolean * flushing); + +G_GNUC_INTERNAL +GstFlowReturn gst_write_mem (GstObject * sink, gint fd, GstPoll * fdset, + const guint8 *data, guint size, + guint64 * bytes_written, guint64 skip, + gint max_transient_error_timeout, guint64 current_position, + gboolean * flushing); G_END_DECLS diff --git a/plugins/elements/gstfdsink.c b/plugins/elements/gstfdsink.c index be5ae28d41..f715ac13f0 100644 --- a/plugins/elements/gstfdsink.c +++ b/plugins/elements/gstfdsink.c @@ -243,18 +243,24 @@ gst_fd_sink_query (GstBaseSink * bsink, GstQuery * query) } static GstFlowReturn -gst_fd_sink_render_buffers (GstFdSink * sink, GstBuffer ** buffers, - guint num_buffers, guint8 * mem_nums, guint total_mems) +gst_fd_sink_render_list (GstBaseSink * bsink, GstBufferList * buffer_list) { + GstFdSink *sink; GstFlowReturn ret; guint64 skip = 0; + guint num_buffers; + + sink = GST_FD_SINK_CAST (bsink); + + num_buffers = gst_buffer_list_length (buffer_list); + if (num_buffers == 0) + goto no_data; for (;;) { guint64 bytes_written = 0; - ret = gst_writev_buffers (GST_OBJECT_CAST (sink), sink->fd, sink->fdset, - buffers, num_buffers, mem_nums, total_mems, &bytes_written, skip, - 0, -1, NULL); + ret = gst_writev_buffer_list (GST_OBJECT_CAST (sink), sink->fd, sink->fdset, + buffer_list, &bytes_written, skip, 0, -1, NULL); sink->bytes_written += bytes_written; sink->current_pos += bytes_written; @@ -269,38 +275,6 @@ gst_fd_sink_render_buffers (GstFdSink * sink, GstBuffer ** buffers, } return ret; -} - -static GstFlowReturn -gst_fd_sink_render_list (GstBaseSink * bsink, GstBufferList * buffer_list) -{ - GstFlowReturn flow; - GstBuffer **buffers; - GstFdSink *sink; - guint8 *mem_nums; - guint total_mems; - guint i, num_buffers; - - sink = GST_FD_SINK_CAST (bsink); - - num_buffers = gst_buffer_list_length (buffer_list); - if (num_buffers == 0) - goto no_data; - - /* extract buffers from list and count memories */ - buffers = g_newa (GstBuffer *, num_buffers); - mem_nums = g_newa (guint8, num_buffers); - for (i = 0, total_mems = 0; i < num_buffers; ++i) { - buffers[i] = gst_buffer_list_get (buffer_list, i); - mem_nums[i] = gst_buffer_n_memory (buffers[i]); - total_mems += mem_nums[i]; - } - - flow = - gst_fd_sink_render_buffers (sink, buffers, num_buffers, mem_nums, - total_mems); - - return flow; no_data: { @@ -312,20 +286,31 @@ no_data: static GstFlowReturn gst_fd_sink_render (GstBaseSink * bsink, GstBuffer * buffer) { - GstFlowReturn flow; GstFdSink *sink; - guint8 n_mem; + GstFlowReturn ret; + guint64 skip = 0; sink = GST_FD_SINK_CAST (bsink); - n_mem = gst_buffer_n_memory (buffer); + for (;;) { + guint64 bytes_written = 0; - if (n_mem > 0) - flow = gst_fd_sink_render_buffers (sink, &buffer, 1, &n_mem, n_mem); - else - flow = GST_FLOW_OK; + ret = gst_writev_buffer (GST_OBJECT_CAST (sink), sink->fd, sink->fdset, + buffer, &bytes_written, skip, 0, -1, NULL); - return flow; + sink->bytes_written += bytes_written; + sink->current_pos += bytes_written; + skip += bytes_written; + + if (!sink->unlock) + break; + + ret = gst_base_sink_wait_preroll (GST_BASE_SINK (sink)); + if (ret != GST_FLOW_OK) + return ret; + } + + return ret; } static gboolean diff --git a/plugins/elements/gstfilesink.c b/plugins/elements/gstfilesink.c index c8b427b076..aab3c9858c 100644 --- a/plugins/elements/gstfilesink.c +++ b/plugins/elements/gstfilesink.c @@ -699,55 +699,28 @@ gst_file_sink_get_current_offset (GstFileSink * filesink, guint64 * p_pos) return (ret != (off_t) - 1); } -static GstFlowReturn -gst_file_sink_render_buffers (GstFileSink * sink, GstBuffer ** buffers, - guint num_buffers, guint8 * mem_nums, guint total_mems, gsize size) -{ - GstFlowReturn ret; - guint64 bytes_written = 0; - - GST_DEBUG_OBJECT (sink, - "writing %u buffers (%u memories, %" G_GSIZE_FORMAT - " bytes) at position %" G_GUINT64_FORMAT, num_buffers, total_mems, size, - sink->current_pos); - - ret = gst_writev_buffers (GST_OBJECT_CAST (sink), fileno (sink->file), NULL, - buffers, num_buffers, mem_nums, total_mems, &bytes_written, 0, - sink->max_transient_error_timeout, sink->current_pos, &sink->flushing); - - sink->current_pos += bytes_written; - - return ret; -} - static GstFlowReturn gst_file_sink_render_list_internal (GstFileSink * sink, GstBufferList * buffer_list) { GstFlowReturn flow; - GstBuffer **buffers; - guint8 *mem_nums; - guint total_mems; - gsize total_size = 0; - guint i, num_buffers; + guint64 bytes_written = 0; + guint num_buffers; num_buffers = gst_buffer_list_length (buffer_list); if (num_buffers == 0) goto no_data; - /* extract buffers from list and count memories */ - buffers = g_newa (GstBuffer *, num_buffers); - mem_nums = g_newa (guint8, num_buffers); - for (i = 0, total_mems = 0; i < num_buffers; ++i) { - buffers[i] = gst_buffer_list_get (buffer_list, i); - mem_nums[i] = gst_buffer_n_memory (buffers[i]); - total_mems += mem_nums[i]; - total_size += gst_buffer_get_size (buffers[i]); - } + GST_DEBUG_OBJECT (sink, + "writing %u buffers at position %" G_GUINT64_FORMAT, num_buffers, + sink->current_pos); flow = - gst_file_sink_render_buffers (sink, buffers, num_buffers, mem_nums, - total_mems, total_size); + gst_writev_buffer_list (GST_OBJECT_CAST (sink), fileno (sink->file), NULL, + buffer_list, &bytes_written, 0, sink->max_transient_error_timeout, + sink->current_pos, &sink->flushing); + + sink->current_pos += bytes_written; return flow; @@ -885,10 +858,22 @@ gst_file_sink_render (GstBaseSink * sink, GstBuffer * buffer) if (n_mem > 0 && (sync_after || !filesink->buffer)) { flow = gst_file_sink_flush_buffer (filesink); - if (flow == GST_FLOW_OK) + if (flow == GST_FLOW_OK) { + guint64 bytes_written = 0; + + GST_DEBUG_OBJECT (sink, + "writing buffer ( %" G_GSIZE_FORMAT + " bytes) at position %" G_GUINT64_FORMAT, + gst_buffer_get_size (buffer), filesink->current_pos); + flow = - gst_file_sink_render_buffers (filesink, &buffer, 1, &n_mem, n_mem, - gst_buffer_get_size (buffer)); + gst_writev_buffer (GST_OBJECT_CAST (filesink), + fileno (filesink->file), NULL, buffer, &bytes_written, 0, + filesink->max_transient_error_timeout, filesink->current_pos, + &filesink->flushing); + + filesink->current_pos += bytes_written; + } } else if (n_mem > 0) { GST_DEBUG_OBJECT (filesink, "Queueing buffer of %" G_GSIZE_FORMAT " bytes at offset %"