filesink: port over unlock code from fdsink

See also: 5216322d39

The previous code was causing "random" flushing returns
in scenarios with intensive state changes such as within
a buffering pipeline.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/476>
This commit is contained in:
Mathieu Duponchelle 2020-05-12 00:54:56 +02:00 committed by GStreamer Merge Bot
parent d0e430a690
commit 710ce84d0c

View file

@ -719,8 +719,8 @@ gst_file_sink_render_list_internal (GstFileSink * sink,
GstBufferList * buffer_list)
{
GstFlowReturn flow;
guint64 bytes_written = 0;
guint num_buffers;
guint64 skip = 0;
num_buffers = gst_buffer_list_length (buffer_list);
if (num_buffers == 0)
@ -730,12 +730,25 @@ gst_file_sink_render_list_internal (GstFileSink * sink,
"writing %u buffers at position %" G_GUINT64_FORMAT, num_buffers,
sink->current_pos);
flow =
gst_writev_buffer_list (GST_OBJECT_CAST (sink), fileno (sink->file), NULL,
buffer_list, &bytes_written, 0, sink->max_transient_error_timeout,
sink->current_pos, &sink->flushing);
for (;;) {
guint64 bytes_written = 0;
sink->current_pos += bytes_written;
flow =
gst_writev_buffer_list (GST_OBJECT_CAST (sink), fileno (sink->file),
NULL, buffer_list, &bytes_written, skip,
sink->max_transient_error_timeout, sink->current_pos, &sink->flushing);
sink->current_pos += bytes_written;
skip += bytes_written;
if (flow != GST_FLOW_FLUSHING)
break;
flow = gst_base_sink_wait_preroll (GST_BASE_SINK (sink));
if (flow != GST_FLOW_OK)
return flow;
}
return flow;
@ -755,16 +768,27 @@ gst_file_sink_flush_buffer (GstFileSink * filesink)
filesink->current_buffer_size);
if (filesink->buffer && filesink->current_buffer_size) {
guint64 bytes_written = 0;
guint64 skip = 0;
flow_ret =
gst_writev_mem (GST_OBJECT_CAST (filesink), fileno (filesink->file),
NULL, filesink->buffer, filesink->current_buffer_size, &bytes_written,
0, filesink->max_transient_error_timeout, filesink->current_pos,
&filesink->flushing);
for (;;) {
guint64 bytes_written = 0;
filesink->current_pos += bytes_written;
flow_ret =
gst_writev_mem (GST_OBJECT_CAST (filesink), fileno (filesink->file),
NULL, filesink->buffer, filesink->current_buffer_size, &bytes_written,
skip, filesink->max_transient_error_timeout, filesink->current_pos,
&filesink->flushing);
filesink->current_pos += bytes_written;
skip += bytes_written;
if (flow_ret != GST_FLOW_FLUSHING)
break;
flow_ret = gst_base_sink_wait_preroll (GST_BASE_SINK (filesink));
if (flow_ret != GST_FLOW_OK)
break;
}
} else if (filesink->buffer_list && filesink->current_buffer_size) {
guint length;
@ -807,6 +831,35 @@ accumulate_size (GstBuffer ** buffer, guint idx, gpointer user_data)
return TRUE;
}
static GstFlowReturn
render_buffer (GstFileSink * filesink, GstBuffer * buffer)
{
GstFlowReturn flow;
guint64 bytes_written = 0;
guint64 skip = 0;
for (;;) {
flow =
gst_writev_buffer (GST_OBJECT_CAST (filesink),
fileno (filesink->file), NULL, buffer, &bytes_written, skip,
filesink->max_transient_error_timeout, filesink->current_pos,
&filesink->flushing);
filesink->current_pos += bytes_written;
skip += bytes_written;
if (flow != GST_FLOW_FLUSHING)
break;
flow = gst_base_sink_wait_preroll (GST_BASE_SINK (filesink));
if (flow != GST_FLOW_OK)
break;
}
return flow;
}
static GstFlowReturn
gst_file_sink_render_list (GstBaseSink * bsink, GstBufferList * buffer_list)
{
@ -851,20 +904,12 @@ gst_file_sink_render_list (GstBaseSink * bsink, GstBufferList * buffer_list)
}
if (buffer_size > sink->allocated_buffer_size) {
guint64 bytes_written = 0;
GST_DEBUG_OBJECT (sink,
"writing buffer ( %" G_GSIZE_FORMAT
" bytes) at position %" G_GUINT64_FORMAT,
buffer_size, sink->current_pos);
flow =
gst_writev_buffer (GST_OBJECT_CAST (sink),
fileno (sink->file), NULL, buffer, &bytes_written, 0,
sink->max_transient_error_timeout, sink->current_pos,
&sink->flushing);
sink->current_pos += bytes_written;
flow = render_buffer (sink, buffer);
} else {
sink->current_buffer_size +=
gst_buffer_extract (buffer, 0,
@ -925,20 +970,7 @@ gst_file_sink_render (GstBaseSink * sink, GstBuffer * buffer)
&& !filesink->buffer_list))) {
flow = gst_file_sink_flush_buffer (filesink);
if (flow == GST_FLOW_OK) {
guint64 bytes_written = 0;
GST_DEBUG_OBJECT (sink,
"writing buffer ( %" G_GSIZE_FORMAT
" bytes) at position %" G_GUINT64_FORMAT,
gst_buffer_get_size (buffer), filesink->current_pos);
flow =
gst_writev_buffer (GST_OBJECT_CAST (filesink),
fileno (filesink->file), NULL, buffer, &bytes_written, 0,
filesink->max_transient_error_timeout, filesink->current_pos,
&filesink->flushing);
filesink->current_pos += bytes_written;
flow = render_buffer (filesink, buffer);
}
} else if (n_mem > 0) {
gsize size = gst_buffer_get_size (buffer);
@ -957,20 +989,12 @@ gst_file_sink_render (GstBaseSink * sink, GstBuffer * buffer)
}
if (size > filesink->allocated_buffer_size) {
guint64 bytes_written = 0;
GST_DEBUG_OBJECT (sink,
"writing buffer ( %" G_GSIZE_FORMAT
" bytes) at position %" G_GUINT64_FORMAT,
size, filesink->current_pos);
flow =
gst_writev_buffer (GST_OBJECT_CAST (filesink),
fileno (filesink->file), NULL, buffer, &bytes_written, 0,
filesink->max_transient_error_timeout, filesink->current_pos,
&filesink->flushing);
filesink->current_pos += bytes_written;
flow = render_buffer (filesink, buffer);
} else {
filesink->current_buffer_size +=
gst_buffer_extract (buffer, 0,
@ -1024,7 +1048,6 @@ gst_file_sink_stop (GstBaseSink * basesink)
filesink = GST_FILE_SINK_CAST (basesink);
gst_file_sink_close_file (filesink);
g_atomic_int_set (&filesink->flushing, TRUE);
return TRUE;
}