diff --git a/plugins/elements/gstelements_private.c b/plugins/elements/gstelements_private.c index 2c0ca580ac..1e25f88bd5 100644 --- a/plugins/elements/gstelements_private.c +++ b/plugins/elements/gstelements_private.c @@ -32,6 +32,7 @@ #ifdef HAVE_SYS_UIO_H #include #endif +#include #include #include #include @@ -39,6 +40,11 @@ #include "gstelements_private.h" #ifdef G_OS_WIN32 +# include /* lseek, open, close, read */ +# undef lseek +# define lseek _lseeki64 +# undef off_t +# define off_t guint64 # define WIN32_LEAN_AND_MEAN /* prevents from including too many things */ # include # undef WIN32_LEAN_AND_MEAN @@ -215,13 +221,20 @@ fill_vectors (struct iovec *vecs, GstMapInfo * maps, guint n, GstBuffer * buf) GstFlowReturn gst_writev_buffers (GstObject * sink, gint fd, GstPoll * fdset, GstBuffer ** buffers, guint num_buffers, guint8 * mem_nums, - guint total_mem_num, guint64 * bytes_written, guint64 skip) + guint total_mem_num, guint64 * bytes_written, guint64 skip, + gint max_transient_error_timeout, guint64 current_position, + gboolean * flushing) { struct iovec *vecs; GstMapInfo *map_infos; GstFlowReturn flow_ret; gsize size = 0; guint i, j; + gint64 start_time = 0; + + max_transient_error_timeout *= 1000; + if (max_transient_error_timeout) + start_time = g_get_monotonic_time (); GST_LOG_OBJECT (sink, "%u buffers, %u memories", num_buffers, total_mem_num); @@ -248,6 +261,11 @@ gst_writev_buffers (GstObject * sink, gint fd, GstPoll * fdset, } do { + if (flushing != NULL && g_atomic_int_get (flushing)) { + GST_DEBUG_OBJECT (sink, "Flushing, exiting loop"); + flow_ret = GST_FLOW_FLUSHING; + goto out; + } #ifndef HAVE_WIN32 if (fdset != NULL) { do { @@ -279,9 +297,45 @@ gst_writev_buffers (GstObject * sink, gint fd, GstPoll * fdset, if (ret < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) { /* do nothing, try again */ + if (max_transient_error_timeout) + start_time = g_get_monotonic_time (); + } else if (ret < 0 && errno == EACCES && max_transient_error_timeout > 0) { + /* seek back to where we started writing and try again after sleeping + * for 10ms. + * + * Some network file systems report EACCES spuriously, presumably + * because at the same time another client is reading the file. + * It happens at least on Linux and macOS on SMB/CIFS and NFS file + * systems. + * + * Note that NFS does not check access permissions during open() + * but only on write()/read() according to open(2), so we would + * loop here in case of NFS. + */ + if (g_get_monotonic_time () > start_time + max_transient_error_timeout) { + GST_ERROR_OBJECT (sink, "Got EACCES for more than %dms, failing", + max_transient_error_timeout); + goto write_error; + } + GST_DEBUG_OBJECT (sink, "got EACCES, retry after 10ms sleep"); + g_assert (current_position != -1); + g_usleep (10000); + + /* Seek back to the current position, sometimes a partial write + * happened and we have no idea how much and if what was written + * is actually correct (it sometimes isn't) + */ + ret = lseek (fd, current_position + *bytes_written, SEEK_SET); + if (ret < 0 || ret != current_position + *bytes_written) { + GST_ERROR_OBJECT (sink, + "failed to seek back to current write position"); + goto write_error; + } } else if (ret < 0) { goto write_error; - } else if (ret < left) { + } else { /* if (ret < left) */ + if (max_transient_error_timeout) + start_time = g_get_monotonic_time (); /* skip vectors that have been written in full */ while (ret >= vecs[0].iov_len) { ret -= vecs[0].iov_len; diff --git a/plugins/elements/gstelements_private.h b/plugins/elements/gstelements_private.h index d196594059..3dcb541ccd 100644 --- a/plugins/elements/gstelements_private.h +++ b/plugins/elements/gstelements_private.h @@ -37,7 +37,9 @@ G_GNUC_INTERNAL GstFlowReturn gst_writev_buffers (GstObject * sink, gint fd, GstPoll * fdset, GstBuffer ** buffers, guint num_buffers, guint8 * mem_nums, guint total_mem_num, - guint64 * bytes_written, guint64 skip); + guint64 * bytes_written, guint64 skip, + gint max_transient_error_timeout, guint64 current_position, + gboolean * flushing); G_END_DECLS diff --git a/plugins/elements/gstfdsink.c b/plugins/elements/gstfdsink.c index 7641cde8df..8f9996f293 100644 --- a/plugins/elements/gstfdsink.c +++ b/plugins/elements/gstfdsink.c @@ -249,7 +249,8 @@ gst_fd_sink_render_buffers (GstFdSink * sink, GstBuffer ** buffers, guint64 bytes_written = 0; ret = gst_writev_buffers (GST_OBJECT_CAST (sink), sink->fd, sink->fdset, - buffers, num_buffers, mem_nums, total_mems, &bytes_written, skip); + buffers, num_buffers, mem_nums, total_mems, &bytes_written, skip, + 0, -1, NULL); sink->bytes_written += bytes_written; sink->current_pos += bytes_written; diff --git a/plugins/elements/gstfilesink.c b/plugins/elements/gstfilesink.c index c722759716..024004897b 100644 --- a/plugins/elements/gstfilesink.c +++ b/plugins/elements/gstfilesink.c @@ -49,6 +49,7 @@ #include "gstfilesink.h" #include #include +#include #ifdef G_OS_WIN32 #include /* lseek, open, close, read */ @@ -106,6 +107,8 @@ GST_DEBUG_CATEGORY_STATIC (gst_file_sink_debug); #define DEFAULT_BUFFER_MODE GST_FILE_SINK_BUFFER_MODE_DEFAULT #define DEFAULT_BUFFER_SIZE 64 * 1024 #define DEFAULT_APPEND FALSE +#define DEFAULT_O_SYNC FALSE +#define DEFAULT_MAX_TRANSIENT_ERROR_TIMEOUT 0 enum { @@ -114,6 +117,8 @@ enum PROP_BUFFER_MODE, PROP_BUFFER_SIZE, PROP_APPEND, + PROP_O_SYNC, + PROP_MAX_TRANSIENT_ERROR_TIMEOUT, PROP_LAST }; @@ -121,12 +126,12 @@ enum * use the 'file pointer' opened in glib (and returned from this function) * in this library, as they may have unrelated C runtimes. */ static FILE * -gst_fopen (const gchar * filename, const gchar * mode) +gst_fopen (const gchar * filename, const gchar * mode, gboolean o_sync) { + FILE *retval; #ifdef G_OS_WIN32 wchar_t *wfilename = g_utf8_to_utf16 (filename, -1, NULL, NULL, NULL); wchar_t *wmode; - FILE *retval; int save_errno; if (wfilename == NULL) { @@ -151,7 +156,23 @@ gst_fopen (const gchar * filename, const gchar * mode) errno = save_errno; return retval; #else - return fopen (filename, mode); + int fd; + int flags = O_CREAT | O_WRONLY; + + if (strcmp (mode, "wb") == 0) + flags |= O_TRUNC; + else if (strcmp (mode, "ab") == 0) + flags |= O_APPEND; + else + g_assert_not_reached (); + + if (o_sync) + flags |= O_SYNC; + + fd = open (filename, flags, 0666); + + retval = fdopen (fd, mode); + return retval; #endif } @@ -172,6 +193,8 @@ static GstFlowReturn gst_file_sink_render (GstBaseSink * sink, GstBuffer * buffer); static GstFlowReturn gst_file_sink_render_list (GstBaseSink * sink, GstBufferList * list); +static gboolean gst_file_sink_unlock (GstBaseSink * sink); +static gboolean gst_file_sink_unlock_stop (GstBaseSink * sink); static gboolean gst_file_sink_do_seek (GstFileSink * filesink, guint64 new_offset); @@ -230,6 +253,19 @@ gst_file_sink_class_init (GstFileSinkClass * klass) "Append to an already existing file", DEFAULT_APPEND, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_O_SYNC, + g_param_spec_boolean ("o-sync", "Synchronous IO", + "Open the file with O_SYNC for enabling synchronous IO", + DEFAULT_O_SYNC, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, + PROP_MAX_TRANSIENT_ERROR_TIMEOUT, + g_param_spec_int ("max-transient-error-timeout", + "Max Transient Error Timeout", + "Retry up to this many ms on transient errors (currently EACCES)", 0, + G_MAXINT, DEFAULT_MAX_TRANSIENT_ERROR_TIMEOUT, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + gst_element_class_set_static_metadata (gstelement_class, "File Sink", "Sink/File", "Write stream to a file", @@ -243,6 +279,9 @@ gst_file_sink_class_init (GstFileSinkClass * klass) gstbasesink_class->render_list = GST_DEBUG_FUNCPTR (gst_file_sink_render_list); gstbasesink_class->event = GST_DEBUG_FUNCPTR (gst_file_sink_event); + gstbasesink_class->unlock = GST_DEBUG_FUNCPTR (gst_file_sink_unlock); + gstbasesink_class->unlock_stop = + GST_DEBUG_FUNCPTR (gst_file_sink_unlock_stop); if (sizeof (off_t) < 8) { GST_LOG ("No large file support, sizeof (off_t) = %" G_GSIZE_FORMAT "!", @@ -330,6 +369,12 @@ gst_file_sink_set_property (GObject * object, guint prop_id, case PROP_APPEND: sink->append = g_value_get_boolean (value); break; + case PROP_O_SYNC: + sink->o_sync = g_value_get_boolean (value); + break; + case PROP_MAX_TRANSIENT_ERROR_TIMEOUT: + sink->max_transient_error_timeout = g_value_get_int (value); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -355,6 +400,12 @@ gst_file_sink_get_property (GObject * object, guint prop_id, GValue * value, case PROP_APPEND: g_value_set_boolean (value, sink->append); break; + case PROP_O_SYNC: + g_value_set_boolean (value, sink->o_sync); + break; + case PROP_MAX_TRANSIENT_ERROR_TIMEOUT: + g_value_set_int (value, sink->max_transient_error_timeout); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -369,9 +420,9 @@ gst_file_sink_open_file (GstFileSink * sink) goto no_filename; if (sink->append) - sink->file = gst_fopen (sink->filename, "ab"); + sink->file = gst_fopen (sink->filename, "ab", sink->o_sync); else - sink->file = gst_fopen (sink->filename, "wb"); + sink->file = gst_fopen (sink->filename, "wb", sink->o_sync); if (sink->file == NULL) goto open_failed; @@ -652,13 +703,21 @@ static GstFlowReturn gst_file_sink_render_buffers (GstFileSink * sink, GstBuffer ** buffers, guint num_buffers, guint8 * mem_nums, guint total_mems, gsize size) { + GstFlowReturn ret; + guint64 bytes_written = 0; + GST_DEBUG_OBJECT (sink, "writing %u buffers (%u memories, %" G_GSIZE_FORMAT " bytes) at position %" G_GUINT64_FORMAT, num_buffers, total_mems, size, sink->current_pos); - return gst_writev_buffers (GST_OBJECT_CAST (sink), fileno (sink->file), NULL, - buffers, num_buffers, mem_nums, total_mems, &sink->current_pos, 0); + ret = gst_writev_buffers (GST_OBJECT_CAST (sink), fileno (sink->file), NULL, + buffers, num_buffers, mem_nums, total_mems, &bytes_written, 0, + sink->max_transient_error_timeout, sink->current_pos, &sink->flushing); + + sink->current_pos += bytes_written; + + return ret; } static GstFlowReturn @@ -857,13 +916,45 @@ gst_file_sink_render (GstBaseSink * sink, GstBuffer * buffer) static gboolean gst_file_sink_start (GstBaseSink * basesink) { - return gst_file_sink_open_file (GST_FILE_SINK (basesink)); + GstFileSink *filesink; + + filesink = GST_FILE_SINK_CAST (basesink); + + g_atomic_int_set (&filesink->flushing, FALSE); + return gst_file_sink_open_file (filesink); } static gboolean gst_file_sink_stop (GstBaseSink * basesink) { - gst_file_sink_close_file (GST_FILE_SINK (basesink)); + GstFileSink *filesink; + + filesink = GST_FILE_SINK_CAST (basesink); + + gst_file_sink_close_file (filesink); + g_atomic_int_set (&filesink->flushing, TRUE); + return TRUE; +} + +static gboolean +gst_file_sink_unlock (GstBaseSink * basesink) +{ + GstFileSink *filesink; + + filesink = GST_FILE_SINK_CAST (basesink); + g_atomic_int_set (&filesink->flushing, TRUE); + + return TRUE; +} + +static gboolean +gst_file_sink_unlock_stop (GstBaseSink * basesink) +{ + GstFileSink *filesink; + + filesink = GST_FILE_SINK_CAST (basesink); + g_atomic_int_set (&filesink->flushing, FALSE); + return TRUE; } diff --git a/plugins/elements/gstfilesink.h b/plugins/elements/gstfilesink.h index c41a9a2b3d..a5dc90bfb9 100644 --- a/plugins/elements/gstfilesink.h +++ b/plugins/elements/gstfilesink.h @@ -85,6 +85,10 @@ struct _GstFileSink { guint current_buffer_size; gboolean append; + gboolean o_sync; + gint max_transient_error_timeout; + + gboolean flushing; }; struct _GstFileSinkClass {