filesink: Add a new full buffer mode to filesink

Previously the default and full modes were the same. Now the default
mode is like before: it accumulates all buffers in a buffer list until
the threshold is reached and then writes them all out, potentially in
multiple writes.

The new full mode works by always copying memory to a single memory area
and writing everything out with a single write once the threshold is
reached.
This commit is contained in:
Sebastian Dröge 2020-03-20 19:28:37 +02:00 committed by GStreamer Merge Bot
parent 93ffec7b2e
commit dcb2be3282
2 changed files with 139 additions and 33 deletions

View file

@ -431,15 +431,24 @@ gst_file_sink_open_file (GstFileSink * sink)
sink->seekable = gst_file_sink_do_seek (sink, 0);
if (sink->buffer)
gst_buffer_list_unref (sink->buffer);
g_free (sink->buffer);
sink->buffer = NULL;
if (sink->buffer_list)
gst_buffer_list_unref (sink->buffer_list);
sink->buffer_list = NULL;
if (sink->buffer_mode != GST_FILE_SINK_BUFFER_MODE_UNBUFFERED) {
if (sink->buffer_size == 0) {
sink->buffer_size = DEFAULT_BUFFER_SIZE;
g_object_notify (G_OBJECT (sink), "buffer-size");
}
sink->buffer = gst_buffer_list_new ();
if (sink->buffer_mode == GST_FILE_SINK_BUFFER_MODE_FULL) {
sink->buffer = g_malloc (sink->buffer_size);
sink->allocated_buffer_size = sink->buffer_size;
} else {
sink->buffer_list = gst_buffer_list_new ();
}
sink->current_buffer_size = 0;
}
@ -481,9 +490,15 @@ gst_file_sink_close_file (GstFileSink * sink)
}
if (sink->buffer) {
gst_buffer_list_unref (sink->buffer);
g_free (sink->buffer);
sink->buffer = NULL;
}
sink->allocated_buffer_size = 0;
if (sink->buffer_list) {
gst_buffer_list_unref (sink->buffer_list);
sink->buffer_list = NULL;
}
sink->current_buffer_size = 0;
}
@ -633,11 +648,11 @@ gst_file_sink_event (GstBaseSink * sink, GstEvent * event)
if (ftruncate (fileno (filesink->file), 0))
goto truncate_failed;
}
if (filesink->buffer) {
gst_buffer_list_unref (filesink->buffer);
filesink->buffer = gst_buffer_list_new ();
filesink->current_buffer_size = 0;
if (filesink->buffer_list) {
gst_buffer_list_unref (filesink->buffer_list);
filesink->buffer_list = gst_buffer_list_new ();
}
filesink->current_buffer_size = 0;
break;
case GST_EVENT_EOS:
if (gst_file_sink_flush_buffer (filesink) != GST_FLOW_OK)
@ -736,23 +751,36 @@ gst_file_sink_flush_buffer (GstFileSink * filesink)
{
GstFlowReturn flow_ret = GST_FLOW_OK;
if (filesink->buffer) {
GST_DEBUG_OBJECT (filesink, "Flushing out buffer of size %" G_GSIZE_FORMAT,
filesink->current_buffer_size);
if (filesink->buffer && filesink->current_buffer_size) {
guint64 bytes_written = 0;
flow_ret =
gst_writev_mem (GST_OBJECT_CAST (filesink), fileno (filesink->file),
NULL, filesink->buffer, filesink->current_buffer_size, &bytes_written,
0, filesink->max_transient_error_timeout, filesink->current_pos,
&filesink->flushing);
filesink->current_pos += bytes_written;
} else if (filesink->buffer_list && filesink->current_buffer_size) {
guint length;
length = gst_buffer_list_length (filesink->buffer);
length = gst_buffer_list_length (filesink->buffer_list);
if (length > 0) {
GST_DEBUG_OBJECT (filesink, "Flushing out buffer of size %u",
filesink->current_buffer_size);
flow_ret =
gst_file_sink_render_list_internal (filesink, filesink->buffer);
gst_file_sink_render_list_internal (filesink, filesink->buffer_list);
/* Remove all buffers from the list but keep the list. This ensures that
* we don't re-allocate the array storing the buffers all the time */
gst_buffer_list_remove (filesink->buffer, 0, length);
filesink->current_buffer_size = 0;
gst_buffer_list_remove (filesink->buffer_list, 0, length);
}
}
filesink->current_buffer_size = 0;
return flow_ret;
}
@ -796,7 +824,7 @@ gst_file_sink_render_list (GstBaseSink * bsink, GstBufferList * buffer_list)
gst_buffer_list_foreach (buffer_list, has_sync_after_buffer, &sync_after);
if (sync_after || !sink->buffer) {
if (sync_after || (!sink->buffer && !sink->buffer_list)) {
flow = gst_file_sink_flush_buffer (sink);
if (flow == GST_FLOW_OK)
flow = gst_file_sink_render_list_internal (sink, buffer_list);
@ -809,15 +837,52 @@ gst_file_sink_render_list (GstBaseSink * bsink, GstBufferList * buffer_list)
G_GUINT64_FORMAT, size, num_buffers,
sink->current_pos + sink->current_buffer_size);
for (i = 0; i < num_buffers; ++i)
gst_buffer_list_add (sink->buffer,
gst_buffer_ref (gst_buffer_list_get (buffer_list, i)));
sink->current_buffer_size += size;
if (sink->current_buffer_size > sink->buffer_size)
flow = gst_file_sink_flush_buffer (sink);
else
if (sink->buffer) {
flow = GST_FLOW_OK;
for (i = 0; i < num_buffers && flow == GST_FLOW_OK; i++) {
GstBuffer *buffer = gst_buffer_list_get (buffer_list, i);
gsize buffer_size = gst_buffer_get_size (buffer);
if (sink->current_buffer_size + buffer_size >
sink->allocated_buffer_size) {
flow = gst_file_sink_flush_buffer (sink);
if (flow != GST_FLOW_OK)
return flow;
}
if (buffer_size > sink->allocated_buffer_size) {
guint64 bytes_written = 0;
GST_DEBUG_OBJECT (sink,
"writing buffer ( %" G_GSIZE_FORMAT
" bytes) at position %" G_GUINT64_FORMAT,
buffer_size, sink->current_pos);
flow =
gst_writev_buffer (GST_OBJECT_CAST (sink),
fileno (sink->file), NULL, buffer, &bytes_written, 0,
sink->max_transient_error_timeout, sink->current_pos,
&sink->flushing);
sink->current_pos += bytes_written;
} else {
sink->current_buffer_size +=
gst_buffer_extract (buffer, 0,
sink->buffer + sink->current_buffer_size, buffer_size);
flow = GST_FLOW_OK;
}
}
} else {
for (i = 0; i < num_buffers; ++i)
gst_buffer_list_add (sink->buffer_list,
gst_buffer_ref (gst_buffer_list_get (buffer_list, i)));
sink->current_buffer_size += size;
if (sink->current_buffer_size > sink->buffer_size)
flow = gst_file_sink_flush_buffer (sink);
else
flow = GST_FLOW_OK;
}
}
if (flow == GST_FLOW_OK && sync_after) {
@ -856,7 +921,8 @@ gst_file_sink_render (GstBaseSink * sink, GstBuffer * buffer)
n_mem = gst_buffer_n_memory (buffer);
if (n_mem > 0 && (sync_after || !filesink->buffer)) {
if (n_mem > 0 && (sync_after || (!filesink->buffer
&& !filesink->buffer_list))) {
flow = gst_file_sink_flush_buffer (filesink);
if (flow == GST_FLOW_OK) {
guint64 bytes_written = 0;
@ -875,18 +941,51 @@ gst_file_sink_render (GstBaseSink * sink, GstBuffer * buffer)
filesink->current_pos += bytes_written;
}
} else if (n_mem > 0) {
gsize size = gst_buffer_get_size (buffer);
GST_DEBUG_OBJECT (filesink,
"Queueing buffer of %" G_GSIZE_FORMAT " bytes at offset %"
G_GUINT64_FORMAT, gst_buffer_get_size (buffer),
G_GUINT64_FORMAT, size,
filesink->current_pos + filesink->current_buffer_size);
filesink->current_buffer_size += gst_buffer_get_size (buffer);
gst_buffer_list_add (filesink->buffer, gst_buffer_ref (buffer));
if (filesink->buffer) {
if (filesink->current_buffer_size + size >
filesink->allocated_buffer_size) {
flow = gst_file_sink_flush_buffer (filesink);
if (flow != GST_FLOW_OK)
return flow;
}
if (filesink->current_buffer_size > filesink->buffer_size)
flow = gst_file_sink_flush_buffer (filesink);
else
flow = GST_FLOW_OK;
if (size > filesink->allocated_buffer_size) {
guint64 bytes_written = 0;
GST_DEBUG_OBJECT (sink,
"writing buffer ( %" G_GSIZE_FORMAT
" bytes) at position %" G_GUINT64_FORMAT,
size, filesink->current_pos);
flow =
gst_writev_buffer (GST_OBJECT_CAST (filesink),
fileno (filesink->file), NULL, buffer, &bytes_written, 0,
filesink->max_transient_error_timeout, filesink->current_pos,
&filesink->flushing);
filesink->current_pos += bytes_written;
} else {
filesink->current_buffer_size +=
gst_buffer_extract (buffer, 0,
filesink->buffer + filesink->current_buffer_size, size);
flow = GST_FLOW_OK;
}
} else {
filesink->current_buffer_size += gst_buffer_get_size (buffer);
gst_buffer_list_add (filesink->buffer_list, gst_buffer_ref (buffer));
if (filesink->current_buffer_size > filesink->buffer_size)
flow = gst_file_sink_flush_buffer (filesink);
else
flow = GST_FLOW_OK;
}
} else {
flow = GST_FLOW_OK;
}

View file

@ -81,8 +81,15 @@ struct _GstFileSink {
gint buffer_mode;
guint buffer_size;
GstBufferList *buffer;
guint current_buffer_size;
/* For default buffer mode */
GstBufferList *buffer_list;
/* For full buffer mode */
guint8 *buffer;
gsize allocated_buffer_size;
/* For default/full buffer mode */
gsize current_buffer_size;
gboolean append;
gboolean o_sync;