From 5432f712bb5a104576e5c6826d343c0866fe9d34 Mon Sep 17 00:00:00 2001 From: David Schleef Date: Fri, 13 Feb 2004 20:16:42 +0000 Subject: [PATCH] gst/elements/gstfdsrc.*: Adds timeout property to fdsrc, and sends an EOS event if file descriptor reading times out. Original commit message from CVS: reviewed by: David Schleef * gst/elements/gstfdsrc.c: (gst_fdsrc_class_init), (gst_fdsrc_init), (gst_fdsrc_set_property), (gst_fdsrc_get_property), (gst_fdsrc_get): * gst/elements/gstfdsrc.h: Adds timeout property to fdsrc, and sends an EOS event if file descriptor reading times out. --- ChangeLog | 10 +++++ gst/elements/gstfdsrc.c | 88 ++++++++++++++++++++++++++++++------- gst/elements/gstfdsrc.h | 12 +++-- plugins/elements/gstfdsrc.c | 88 ++++++++++++++++++++++++++++++------- plugins/elements/gstfdsrc.h | 12 +++-- 5 files changed, 168 insertions(+), 42 deletions(-) diff --git a/ChangeLog b/ChangeLog index 3f89aec5ea..728e3b3afb 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,13 @@ +2004-02-13 Mattias Wadman + + reviewed by: David Schleef + + * gst/elements/gstfdsrc.c: (gst_fdsrc_class_init), + (gst_fdsrc_init), (gst_fdsrc_set_property), + (gst_fdsrc_get_property), (gst_fdsrc_get): + * gst/elements/gstfdsrc.h: Adds timeout property to fdsrc, + and sends an EOS event if file descriptor reading times out. + 2004-02-13 Thomas Vander Stichele * configure.ac: diff --git a/gst/elements/gstfdsrc.c b/gst/elements/gstfdsrc.c index ffabc94d80..6cb2ee82c6 100644 --- a/gst/elements/gstfdsrc.c +++ b/gst/elements/gstfdsrc.c @@ -26,6 +26,7 @@ #include #include #include +#include #ifdef HAVE_CONFIG_H # include "config.h" @@ -48,16 +49,19 @@ GstElementDetails gst_fdsrc_details = GST_ELEMENT_DETAILS ( /* FdSrc signals and args */ enum { - /* FILL ME */ + SIGNAL_TIMEOUT, LAST_SIGNAL }; enum { ARG_0, ARG_FD, - ARG_BLOCKSIZE + ARG_BLOCKSIZE, + ARG_TIMEOUT }; +static guint gst_fdsrc_signals[LAST_SIGNAL] = { 0 }; + #define _do_init(bla) \ GST_DEBUG_CATEGORY_INIT (gst_fdsrc_debug, "fdsrc", 0, "fdsrc element"); @@ -84,7 +88,6 @@ gst_fdsrc_class_init (GstFdSrcClass *klass) GObjectClass *gobject_class; gobject_class = G_OBJECT_CLASS (klass); - g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_FD, g_param_spec_int ("fd", "fd", "An open file descriptor to read from", @@ -92,6 +95,14 @@ gst_fdsrc_class_init (GstFdSrcClass *klass) g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BLOCKSIZE, g_param_spec_ulong ("blocksize", "Block size", "Size in bytes to read per buffer", 1, G_MAXULONG, DEFAULT_BLOCKSIZE, G_PARAM_READWRITE)); + g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_TIMEOUT, + g_param_spec_uint64 ("timeout", "Timeout", "Read timeout in nanoseconds", + 0, G_MAXUINT64, 0, G_PARAM_READWRITE)); + + gst_fdsrc_signals[SIGNAL_TIMEOUT] = + g_signal_new ("timeout", G_TYPE_FROM_CLASS(klass), G_SIGNAL_RUN_LAST, + G_STRUCT_OFFSET (GstFdSrcClass, timeout), NULL, NULL, + g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0); gobject_class->set_property = gst_fdsrc_set_property; gobject_class->get_property = gst_fdsrc_get_property; @@ -106,6 +117,7 @@ static void gst_fdsrc_init(GstFdSrc *fdsrc) { fdsrc->fd = 0; fdsrc->curoffset = 0; fdsrc->blocksize = DEFAULT_BLOCKSIZE; + fdsrc->timeout = 0; fdsrc->seq = 0; } @@ -127,6 +139,9 @@ gst_fdsrc_set_property (GObject *object, guint prop_id, const GValue *value, GPa case ARG_BLOCKSIZE: src->blocksize = g_value_get_ulong (value); break; + case ARG_TIMEOUT: + src->timeout = g_value_get_uint64 (value); + break; default: break; } @@ -149,6 +164,9 @@ gst_fdsrc_get_property (GObject *object, guint prop_id, GValue *value, GParamSpe case ARG_FD: g_value_set_int (value, src->fd); break; + case ARG_TIMEOUT: + g_value_set_uint64 (value, src->timeout); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -161,32 +179,68 @@ gst_fdsrc_get(GstPad *pad) GstFdSrc *src; GstBuffer *buf; glong readbytes; + fd_set readfds; + struct timeval t, *tp = &t; + gint retval; src = GST_FDSRC (gst_pad_get_parent (pad)); /* create the buffer */ buf = gst_buffer_new_and_alloc (src->blocksize); - /* read it in from the file */ - readbytes = read (src->fd, GST_BUFFER_DATA (buf), src->blocksize); + FD_ZERO (&readfds); + FD_SET (src->fd, &readfds); - /* if nothing was read, we're in eos */ - if (readbytes == 0) { + if (src->timeout != 0) + { + tp->tv_sec = src->timeout / 1000000000; + tp->tv_usec = src->timeout - t.tv_sec * 1000000000; + } + else + tp = NULL; + + do + { + retval = select (1, &readfds, NULL, NULL, tp); + } while (retval == -1 && errno == EINTR); /* retry if interrupted */ + + if (retval == -1) + { + GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), ("select on file descriptor: %s.", g_strerror(errno))); gst_element_set_eos (GST_ELEMENT (src)); return GST_DATA (gst_event_new (GST_EVENT_EOS)); } + else if (retval == 0) + { + g_signal_emit (G_OBJECT (src), gst_fdsrc_signals[SIGNAL_TIMEOUT], 0); + gst_element_set_eos (GST_ELEMENT (src)); + return GST_DATA(gst_event_new (GST_EVENT_EOS)); + } - if (readbytes == -1) { - g_error ("Error reading from file descriptor. Ending stream.\n"); + do + { + readbytes = read (src->fd, GST_BUFFER_DATA (buf), src->blocksize); + } while (readbytes == -1 && errno == EINTR); /* retry if interrupted */ + + if (readbytes > 0) + { + GST_BUFFER_OFFSET (buf) = src->curoffset; + GST_BUFFER_SIZE (buf) = readbytes; + GST_BUFFER_TIMESTAMP (buf) = GST_CLOCK_TIME_NONE; + src->curoffset += readbytes; + + /* we're done, return the buffer */ + return GST_DATA (buf); + } + else if (readbytes == 0) + { + gst_element_set_eos (GST_ELEMENT (src)); + return GST_DATA (gst_event_new (GST_EVENT_EOS)); + } + else + { + GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), ("read on file descriptor: %s.", g_strerror(errno))); gst_element_set_eos (GST_ELEMENT (src)); return GST_DATA (gst_event_new (GST_EVENT_EOS)); } - - GST_BUFFER_OFFSET (buf) = src->curoffset; - GST_BUFFER_SIZE (buf) = readbytes; - GST_BUFFER_TIMESTAMP (buf) = GST_CLOCK_TIME_NONE; - src->curoffset += readbytes; - - /* we're done, return the buffer */ - return GST_DATA (buf); } diff --git a/gst/elements/gstfdsrc.h b/gst/elements/gstfdsrc.h index b004e0ba4a..3bb9e7812e 100644 --- a/gst/elements/gstfdsrc.h +++ b/gst/elements/gstfdsrc.h @@ -52,14 +52,18 @@ struct _GstFdSrc { /* fd */ gint fd; - gulong curoffset; /* current offset in file */ - gulong blocksize; /* bytes per read */ - - gulong seq; /* buffer sequence number */ + gulong curoffset; /* current offset in file */ + gulong blocksize; /* bytes per read */ + guint64 timeout; /* read timeout, in nanoseconds */ + + gulong seq; /* buffer sequence number */ }; struct _GstFdSrcClass { GstElementClass parent_class; + + /* signals */ + void (*timeout) (GstElement *element); }; GType gst_fdsrc_get_type(void); diff --git a/plugins/elements/gstfdsrc.c b/plugins/elements/gstfdsrc.c index ffabc94d80..6cb2ee82c6 100644 --- a/plugins/elements/gstfdsrc.c +++ b/plugins/elements/gstfdsrc.c @@ -26,6 +26,7 @@ #include #include #include +#include #ifdef HAVE_CONFIG_H # include "config.h" @@ -48,16 +49,19 @@ GstElementDetails gst_fdsrc_details = GST_ELEMENT_DETAILS ( /* FdSrc signals and args */ enum { - /* FILL ME */ + SIGNAL_TIMEOUT, LAST_SIGNAL }; enum { ARG_0, ARG_FD, - ARG_BLOCKSIZE + ARG_BLOCKSIZE, + ARG_TIMEOUT }; +static guint gst_fdsrc_signals[LAST_SIGNAL] = { 0 }; + #define _do_init(bla) \ GST_DEBUG_CATEGORY_INIT (gst_fdsrc_debug, "fdsrc", 0, "fdsrc element"); @@ -84,7 +88,6 @@ gst_fdsrc_class_init (GstFdSrcClass *klass) GObjectClass *gobject_class; gobject_class = G_OBJECT_CLASS (klass); - g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_FD, g_param_spec_int ("fd", "fd", "An open file descriptor to read from", @@ -92,6 +95,14 @@ gst_fdsrc_class_init (GstFdSrcClass *klass) g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BLOCKSIZE, g_param_spec_ulong ("blocksize", "Block size", "Size in bytes to read per buffer", 1, G_MAXULONG, DEFAULT_BLOCKSIZE, G_PARAM_READWRITE)); + g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_TIMEOUT, + g_param_spec_uint64 ("timeout", "Timeout", "Read timeout in nanoseconds", + 0, G_MAXUINT64, 0, G_PARAM_READWRITE)); + + gst_fdsrc_signals[SIGNAL_TIMEOUT] = + g_signal_new ("timeout", G_TYPE_FROM_CLASS(klass), G_SIGNAL_RUN_LAST, + G_STRUCT_OFFSET (GstFdSrcClass, timeout), NULL, NULL, + g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0); gobject_class->set_property = gst_fdsrc_set_property; gobject_class->get_property = gst_fdsrc_get_property; @@ -106,6 +117,7 @@ static void gst_fdsrc_init(GstFdSrc *fdsrc) { fdsrc->fd = 0; fdsrc->curoffset = 0; fdsrc->blocksize = DEFAULT_BLOCKSIZE; + fdsrc->timeout = 0; fdsrc->seq = 0; } @@ -127,6 +139,9 @@ gst_fdsrc_set_property (GObject *object, guint prop_id, const GValue *value, GPa case ARG_BLOCKSIZE: src->blocksize = g_value_get_ulong (value); break; + case ARG_TIMEOUT: + src->timeout = g_value_get_uint64 (value); + break; default: break; } @@ -149,6 +164,9 @@ gst_fdsrc_get_property (GObject *object, guint prop_id, GValue *value, GParamSpe case ARG_FD: g_value_set_int (value, src->fd); break; + case ARG_TIMEOUT: + g_value_set_uint64 (value, src->timeout); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -161,32 +179,68 @@ gst_fdsrc_get(GstPad *pad) GstFdSrc *src; GstBuffer *buf; glong readbytes; + fd_set readfds; + struct timeval t, *tp = &t; + gint retval; src = GST_FDSRC (gst_pad_get_parent (pad)); /* create the buffer */ buf = gst_buffer_new_and_alloc (src->blocksize); - /* read it in from the file */ - readbytes = read (src->fd, GST_BUFFER_DATA (buf), src->blocksize); + FD_ZERO (&readfds); + FD_SET (src->fd, &readfds); - /* if nothing was read, we're in eos */ - if (readbytes == 0) { + if (src->timeout != 0) + { + tp->tv_sec = src->timeout / 1000000000; + tp->tv_usec = src->timeout - t.tv_sec * 1000000000; + } + else + tp = NULL; + + do + { + retval = select (1, &readfds, NULL, NULL, tp); + } while (retval == -1 && errno == EINTR); /* retry if interrupted */ + + if (retval == -1) + { + GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), ("select on file descriptor: %s.", g_strerror(errno))); gst_element_set_eos (GST_ELEMENT (src)); return GST_DATA (gst_event_new (GST_EVENT_EOS)); } + else if (retval == 0) + { + g_signal_emit (G_OBJECT (src), gst_fdsrc_signals[SIGNAL_TIMEOUT], 0); + gst_element_set_eos (GST_ELEMENT (src)); + return GST_DATA(gst_event_new (GST_EVENT_EOS)); + } - if (readbytes == -1) { - g_error ("Error reading from file descriptor. Ending stream.\n"); + do + { + readbytes = read (src->fd, GST_BUFFER_DATA (buf), src->blocksize); + } while (readbytes == -1 && errno == EINTR); /* retry if interrupted */ + + if (readbytes > 0) + { + GST_BUFFER_OFFSET (buf) = src->curoffset; + GST_BUFFER_SIZE (buf) = readbytes; + GST_BUFFER_TIMESTAMP (buf) = GST_CLOCK_TIME_NONE; + src->curoffset += readbytes; + + /* we're done, return the buffer */ + return GST_DATA (buf); + } + else if (readbytes == 0) + { + gst_element_set_eos (GST_ELEMENT (src)); + return GST_DATA (gst_event_new (GST_EVENT_EOS)); + } + else + { + GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), ("read on file descriptor: %s.", g_strerror(errno))); gst_element_set_eos (GST_ELEMENT (src)); return GST_DATA (gst_event_new (GST_EVENT_EOS)); } - - GST_BUFFER_OFFSET (buf) = src->curoffset; - GST_BUFFER_SIZE (buf) = readbytes; - GST_BUFFER_TIMESTAMP (buf) = GST_CLOCK_TIME_NONE; - src->curoffset += readbytes; - - /* we're done, return the buffer */ - return GST_DATA (buf); } diff --git a/plugins/elements/gstfdsrc.h b/plugins/elements/gstfdsrc.h index b004e0ba4a..3bb9e7812e 100644 --- a/plugins/elements/gstfdsrc.h +++ b/plugins/elements/gstfdsrc.h @@ -52,14 +52,18 @@ struct _GstFdSrc { /* fd */ gint fd; - gulong curoffset; /* current offset in file */ - gulong blocksize; /* bytes per read */ - - gulong seq; /* buffer sequence number */ + gulong curoffset; /* current offset in file */ + gulong blocksize; /* bytes per read */ + guint64 timeout; /* read timeout, in nanoseconds */ + + gulong seq; /* buffer sequence number */ }; struct _GstFdSrcClass { GstElementClass parent_class; + + /* signals */ + void (*timeout) (GstElement *element); }; GType gst_fdsrc_get_type(void);