filesink: Implement buffering internally

We use writev() so every call ends up going to the kernel but for small
buffers we generally would prefer to do as few write calls as possible.

https://bugzilla.gnome.org/show_bug.cgi?id=794173
This commit is contained in:
Sebastian Dröge 2018-08-14 11:28:00 +03:00
parent e975e0cae8
commit 6b4fc62b7b
2 changed files with 159 additions and 12 deletions

View file

@ -183,6 +183,8 @@ static gboolean gst_file_sink_query (GstBaseSink * bsink, GstQuery * query);
static void gst_file_sink_uri_handler_init (gpointer g_iface,
gpointer iface_data);
static GstFlowReturn gst_file_sink_flush_buffer (GstFileSink * filesink);
#define _do_init \
G_IMPLEMENT_INTERFACE (GST_TYPE_URI_HANDLER, gst_file_sink_uri_handler_init); \
GST_DEBUG_CATEGORY_INIT (gst_file_sink_debug, "filesink", 0, "filesink element");
@ -377,6 +379,18 @@ gst_file_sink_open_file (GstFileSink * sink)
/* try to seek in the file to figure out if it is seekable */
sink->seekable = gst_file_sink_do_seek (sink, 0);
if (sink->buffer)
gst_buffer_list_unref (sink->buffer);
sink->buffer = NULL;
if (sink->buffer_mode != GST_FILE_SINK_BUFFER_MODE_UNBUFFERED) {
if (sink->buffer_size == 0) {
sink->buffer_size = DEFAULT_BUFFER_SIZE;
g_object_notify (G_OBJECT (sink), "buffer-size");
}
sink->buffer = gst_buffer_list_new ();
}
GST_DEBUG_OBJECT (sink, "opened file %s, seekable %d",
sink->filename, sink->seekable);
@ -402,6 +416,10 @@ static void
gst_file_sink_close_file (GstFileSink * sink)
{
if (sink->file) {
if (gst_file_sink_flush_buffer (sink) != GST_FLOW_OK)
GST_ELEMENT_ERROR (sink, RESOURCE, CLOSE,
(_("Error closing file \"%s\"."), sink->filename), NULL);
if (fclose (sink->file) != 0)
GST_ELEMENT_ERROR (sink, RESOURCE, CLOSE,
(_("Error closing file \"%s\"."), sink->filename), GST_ERROR_SYSTEM);
@ -409,6 +427,11 @@ gst_file_sink_close_file (GstFileSink * sink)
GST_DEBUG_OBJECT (sink, "closed file");
sink->file = NULL;
}
if (sink->buffer) {
gst_buffer_list_unref (sink->buffer);
sink->buffer = NULL;
}
}
static gboolean
@ -477,6 +500,9 @@ gst_file_sink_do_seek (GstFileSink * filesink, guint64 new_offset)
GST_DEBUG_OBJECT (filesink, "Seeking to offset %" G_GUINT64_FORMAT
" using " __GST_STDIO_SEEK_FUNCTION, new_offset);
if (gst_file_sink_flush_buffer (filesink) != GST_FLOW_OK)
goto flush_buffer_failed;
#ifdef HAVE_FSEEKO
if (fseeko (filesink->file, (off_t) new_offset, SEEK_SET) != 0)
goto seek_failed;
@ -496,6 +522,11 @@ gst_file_sink_do_seek (GstFileSink * filesink, guint64 new_offset)
return TRUE;
/* ERRORS */
flush_buffer_failed:
{
GST_DEBUG_OBJECT (filesink, "Flushing buffer failed");
return FALSE;
}
seek_failed:
{
GST_DEBUG_OBJECT (filesink, "Seeking failed: %s", g_strerror (errno));
@ -548,6 +579,10 @@ gst_file_sink_event (GstBaseSink * sink, GstEvent * event)
goto truncate_failed;
}
break;
case GST_EVENT_EOS:
if (gst_file_sink_flush_buffer (filesink) != GST_FLOW_OK)
goto flush_buffer_failed;
break;
default:
break;
}
@ -563,6 +598,13 @@ seek_failed:
gst_event_unref (event);
return FALSE;
}
flush_buffer_failed:
{
GST_ELEMENT_ERROR (filesink, RESOURCE, WRITE,
(_("Error while writing to file \"%s\"."), filesink->filename), NULL);
gst_event_unref (event);
return FALSE;
}
truncate_failed:
{
GST_ELEMENT_ERROR (filesink, RESOURCE, WRITE,
@ -578,6 +620,11 @@ gst_file_sink_get_current_offset (GstFileSink * filesink, guint64 * p_pos)
{
off_t ret = -1;
/* no need to flush internal buffer here as this is only called right
* after a seek. If this changes then the buffer should be flushed here
* too
*/
#ifdef HAVE_FTELLO
ret = ftello (filesink->file);
#elif defined (G_OS_UNIX) || defined (G_OS_WIN32)
@ -605,17 +652,14 @@ gst_file_sink_render_buffers (GstFileSink * sink, GstBuffer ** buffers,
}
static GstFlowReturn
gst_file_sink_render_list (GstBaseSink * bsink, GstBufferList * buffer_list)
gst_file_sink_render_list_internal (GstFileSink * sink,
GstBufferList * buffer_list)
{
GstFlowReturn flow;
GstBuffer **buffers;
GstFileSink *sink;
guint8 *mem_nums;
guint total_mems;
guint i, num_buffers;
gboolean sync_after = FALSE;
sink = GST_FILE_SINK_CAST (bsink);
num_buffers = gst_buffer_list_length (buffer_list);
if (num_buffers == 0)
@ -628,14 +672,102 @@ gst_file_sink_render_list (GstBaseSink * bsink, GstBufferList * buffer_list)
buffers[i] = gst_buffer_list_get (buffer_list, i);
mem_nums[i] = gst_buffer_n_memory (buffers[i]);
total_mems += mem_nums[i];
if (GST_BUFFER_FLAG_IS_SET (buffers[i], GST_BUFFER_FLAG_SYNC_AFTER))
sync_after = TRUE;
}
flow =
gst_file_sink_render_buffers (sink, buffers, num_buffers, mem_nums,
total_mems);
return flow;
no_data:
{
GST_LOG_OBJECT (sink, "empty buffer list");
return GST_FLOW_OK;
}
}
static GstFlowReturn
gst_file_sink_flush_buffer (GstFileSink * filesink)
{
GstFlowReturn flow_ret = GST_FLOW_OK;
if (filesink->buffer) {
guint length;
length = gst_buffer_list_length (filesink->buffer);
if (length > 0) {
GST_DEBUG_OBJECT (filesink, "Flushing out buffer of size %u",
filesink->current_buffer_size);
flow_ret =
gst_file_sink_render_list_internal (filesink, filesink->buffer);
/* Remove all buffers from the list but keep the list. This ensures that
* we don't re-allocate the array storing the buffers all the time */
gst_buffer_list_remove (filesink->buffer, 0, length);
filesink->current_buffer_size = 0;
}
}
return flow_ret;
}
static gboolean
has_sync_after_buffer (GstBuffer ** buffer, guint idx, gpointer user_data)
{
if (GST_BUFFER_FLAG_IS_SET (*buffer, GST_BUFFER_FLAG_SYNC_AFTER)) {
gboolean *sync_after = user_data;
*sync_after = TRUE;
return FALSE;
}
return TRUE;
}
static gboolean
accumulate_size (GstBuffer ** buffer, guint idx, gpointer user_data)
{
guint *size = user_data;
*size += gst_buffer_get_size (*buffer);
return TRUE;
}
static GstFlowReturn
gst_file_sink_render_list (GstBaseSink * bsink, GstBufferList * buffer_list)
{
GstFlowReturn flow;
GstFileSink *sink;
guint i, num_buffers;
gboolean sync_after = FALSE;
sink = GST_FILE_SINK_CAST (bsink);
num_buffers = gst_buffer_list_length (buffer_list);
if (num_buffers == 0)
goto no_data;
gst_buffer_list_foreach (buffer_list, has_sync_after_buffer, &sync_after);
if (sync_after || !sink->buffer) {
flow = gst_file_sink_render_list_internal (sink, buffer_list);
} else {
guint size = 0;
gst_buffer_list_foreach (buffer_list, accumulate_size, &size);
for (i = 0; i < num_buffers; ++i)
gst_buffer_list_add (sink->buffer,
gst_buffer_ref (gst_buffer_list_get (buffer_list, i)));
sink->current_buffer_size += size;
if (sink->current_buffer_size > sink->buffer_size)
flow = gst_file_sink_flush_buffer (sink);
else
flow = GST_FLOW_OK;
}
if (flow == GST_FLOW_OK && sync_after) {
if (fsync (fileno (sink->file))) {
GST_ELEMENT_ERROR (sink, RESOURCE, WRITE,
@ -660,18 +792,30 @@ gst_file_sink_render (GstBaseSink * sink, GstBuffer * buffer)
GstFileSink *filesink;
GstFlowReturn flow;
guint8 n_mem;
gboolean sync_after;
filesink = GST_FILE_SINK_CAST (sink);
sync_after = GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_SYNC_AFTER);
n_mem = gst_buffer_n_memory (buffer);
if (n_mem > 0)
if (n_mem > 0 && (sync_after || !filesink->buffer)) {
flow = gst_file_sink_render_buffers (filesink, &buffer, 1, &n_mem, n_mem);
else
flow = GST_FLOW_OK;
} else if (n_mem > 0) {
if (flow == GST_FLOW_OK &&
GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_SYNC_AFTER)) {
filesink->current_buffer_size += gst_buffer_get_size (buffer);
gst_buffer_list_add (filesink->buffer, gst_buffer_ref (buffer));
if (filesink->current_buffer_size > filesink->buffer_size)
flow = gst_file_sink_flush_buffer (filesink);
else
flow = GST_FLOW_OK;
} else {
flow = GST_FLOW_OK;
}
if (flow == GST_FLOW_OK && sync_after) {
if (fsync (fileno (filesink->file))) {
GST_ELEMENT_ERROR (filesink, RESOURCE, WRITE,
(_("Error while writing to file \"%s\"."), filesink->filename),

View file

@ -81,6 +81,9 @@ struct _GstFileSink {
gint buffer_mode;
guint buffer_size;
GstBufferList *buffer;
guint current_buffer_size;
gboolean append;
};