diff --git a/ChangeLog b/ChangeLog index 586136a0c2..f26b7c7361 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,15 @@ +2008-01-30 Sebastian Dröge + + * ext/gio/gstgiobasesink.c: (close_stream_cb), + (gst_gio_base_sink_stop), (gst_gio_base_sink_event), + (gst_gio_base_sink_render), (gst_gio_base_sink_set_stream): + * ext/gio/gstgiobasesrc.c: (close_stream_cb), + (gst_gio_base_src_stop), (gst_gio_base_src_create), + (gst_gio_base_src_set_stream): + Use async variants of the close stream functions to prevent blocking + for a long time there and add some more sanity checks for a correct + stream. + 2008-01-30 Sebastian Dröge * tests/icles/Makefile.am: diff --git a/ext/gio/gstgiobasesink.c b/ext/gio/gstgiobasesink.c index 1885b46302..f692fa139c 100644 --- a/ext/gio/gstgiobasesink.c +++ b/ext/gio/gstgiobasesink.c @@ -128,32 +128,41 @@ gst_gio_base_sink_start (GstBaseSink * base_sink) return TRUE; } +static void +close_stream_cb (GObject * object, GAsyncResult * res, gpointer user_data) +{ + GstGioBaseSink *sink = GST_GIO_BASE_SINK (user_data); + gboolean success; + GError *err = NULL; + + success = g_output_stream_close_finish (G_OUTPUT_STREAM (object), res, &err); + + if (!success + && !gst_gio_error (sink, "g_output_stream_close_async", &err, NULL)) { + GST_ELEMENT_ERROR (sink, RESOURCE, CLOSE, (NULL), + ("g_output_stream_close_async failed: %s", err->message)); + g_clear_error (&err); + } + + GST_DEBUG_OBJECT (sink, "closed stream"); + + g_object_unref (sink); +} + static gboolean gst_gio_base_sink_stop (GstBaseSink * base_sink) { GstGioBaseSink *sink = GST_GIO_BASE_SINK (base_sink); - gboolean success = TRUE; - GError *err = NULL; if (G_IS_OUTPUT_STREAM (sink->stream)) { - /* FIXME: In case that the call below would block, there is no one to - * trigger the cancellation! */ - - success = g_output_stream_close (sink->stream, sink->cancel, &err); - - if (success) { - GST_DEBUG_OBJECT (sink, "closed stream"); - } else if (!gst_gio_error (sink, "g_output_stream_close", &err, NULL)) { - GST_ELEMENT_ERROR (sink, RESOURCE, CLOSE, (NULL), - ("g_output_stream_close failed: %s", err->message)); - g_clear_error (&err); - } - + GST_DEBUG_OBJECT (sink, "closing stream"); + g_output_stream_close_async (sink->stream, 0, sink->cancel, close_stream_cb, + g_object_ref (sink)); g_object_unref (sink->stream); sink->stream = NULL; } - return success; + return TRUE; } static gboolean @@ -191,45 +200,45 @@ gst_gio_base_sink_event (GstBaseSink * base_sink, GstEvent * event) switch (GST_EVENT_TYPE (event)) { case GST_EVENT_NEWSEGMENT: - { - GstFormat format; - gint64 offset; + if (G_IS_OUTPUT_STREAM (sink->stream)) { + GstFormat format; + gint64 offset; - gst_event_parse_new_segment (event, NULL, NULL, &format, &offset, NULL, - NULL); + gst_event_parse_new_segment (event, NULL, NULL, &format, &offset, NULL, + NULL); - if (format != GST_FORMAT_BYTES) { - GST_WARNING_OBJECT (sink, "ignored NEWSEGMENT event in %s format", - gst_format_get_name (format)); - break; + if (format != GST_FORMAT_BYTES) { + GST_WARNING_OBJECT (sink, "ignored NEWSEGMENT event in %s format", + gst_format_get_name (format)); + break; + } + + if (GST_GIO_STREAM_IS_SEEKABLE (sink->stream)) { + ret = gst_gio_seek (sink, G_SEEKABLE (sink->stream), offset, + sink->cancel); + if (ret == GST_FLOW_OK) + sink->position = offset; + } else { + ret = GST_FLOW_NOT_SUPPORTED; + } } - - if (GST_GIO_STREAM_IS_SEEKABLE (sink->stream)) { - ret = gst_gio_seek (sink, G_SEEKABLE (sink->stream), offset, - sink->cancel); - if (ret == GST_FLOW_OK) - sink->position = offset; - } else { - ret = GST_FLOW_NOT_SUPPORTED; - } - } break; case GST_EVENT_EOS: case GST_EVENT_FLUSH_START: - { - gboolean success; - GError *err = NULL; + if (G_IS_OUTPUT_STREAM (sink->stream)) { + gboolean success; + GError *err = NULL; - success = g_output_stream_flush (sink->stream, sink->cancel, &err); + success = g_output_stream_flush (sink->stream, sink->cancel, &err); - if (!success && !gst_gio_error (sink, "g_output_stream_flush", &err, - &ret)) { - GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, (NULL), - ("flush failed: %s", err->message)); - g_clear_error (&err); + if (!success && !gst_gio_error (sink, "g_output_stream_flush", &err, + &ret)) { + GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, (NULL), + ("flush failed: %s", err->message)); + g_clear_error (&err); + } } - } break; default: @@ -247,6 +256,8 @@ gst_gio_base_sink_render (GstBaseSink * base_sink, GstBuffer * buffer) gboolean success; GError *err = NULL; + g_return_val_if_fail (G_IS_OUTPUT_STREAM (sink->stream), GST_FLOW_ERROR); + GST_LOG_OBJECT (sink, "writing %u bytes to offset %" G_GUINT64_FORMAT, GST_BUFFER_SIZE (buffer), sink->position); @@ -314,18 +325,10 @@ gst_gio_base_sink_set_stream (GstGioBaseSink * sink, GOutputStream * stream) g_return_if_fail ((GST_STATE (sink) != GST_STATE_PLAYING && GST_STATE (sink) != GST_STATE_PAUSED)); - if (sink->stream) { - GError *err = NULL; - gboolean success = g_output_stream_close (sink->stream, sink->cancel, &err); - - if (success) { - GST_DEBUG_OBJECT (sink, "closed stream"); - } else if (!gst_gio_error (sink, "g_output_stream_close", &err, NULL)) { - GST_WARNING_OBJECT (sink, "g_output_stream_close failed: %s", - err->message); - g_clear_error (&err); - } - + if (G_IS_OUTPUT_STREAM (sink->stream)) { + GST_DEBUG_OBJECT (sink, "closing old stream"); + g_output_stream_close_async (sink->stream, 0, sink->cancel, close_stream_cb, + g_object_ref (sink)); g_object_unref (sink->stream); sink->stream = NULL; } diff --git a/ext/gio/gstgiobasesrc.c b/ext/gio/gstgiobasesrc.c index 019f8ee4d0..ec9f2fbc50 100644 --- a/ext/gio/gstgiobasesrc.c +++ b/ext/gio/gstgiobasesrc.c @@ -128,32 +128,41 @@ gst_gio_base_src_start (GstBaseSrc * base_src) return TRUE; } -static gboolean -gst_gio_base_src_stop (GstBaseSrc * base_src) +static void +close_stream_cb (GObject * object, GAsyncResult * res, gpointer user_data) { - GstGioBaseSrc *src = GST_GIO_BASE_SRC (base_src); - gboolean success = TRUE; + GstGioBaseSrc *src = GST_GIO_BASE_SRC (user_data); + gboolean success; GError *err = NULL; - if (src->stream != NULL) { - /* FIXME: In case that the call below would block, there is no one to - * trigger the cancellation! */ + success = g_input_stream_close_finish (G_INPUT_STREAM (object), res, &err); - success = g_input_stream_close (src->stream, src->cancel, &err); - - if (!success && !gst_gio_error (src, "g_input_stream_close", &err, NULL)) { - GST_ELEMENT_ERROR (src, RESOURCE, CLOSE, (NULL), - ("g_input_stream_close failed: %s", err->message)); - g_clear_error (&err); - } - - g_object_unref (src->stream); - src->stream = NULL; + if (!success + && !gst_gio_error (src, "g_input_stream_close_async", &err, NULL)) { + GST_ELEMENT_ERROR (src, RESOURCE, CLOSE, (NULL), + ("g_input_stream_close_async failed: %s", err->message)); + g_clear_error (&err); } GST_DEBUG_OBJECT (src, "closed stream"); - return success; + g_object_unref (src); +} + +static gboolean +gst_gio_base_src_stop (GstBaseSrc * base_src) +{ + GstGioBaseSrc *src = GST_GIO_BASE_SRC (base_src); + + if (G_IS_INPUT_STREAM (src->stream)) { + GST_DEBUG_OBJECT (src, "closing stream"); + g_input_stream_close_async (src->stream, 0, src->cancel, close_stream_cb, + g_object_ref (src)); + g_object_unref (src->stream); + src->stream = NULL; + } + + return TRUE; } static gboolean @@ -295,6 +304,8 @@ gst_gio_base_src_create (GstBaseSrc * base_src, guint64 offset, guint size, GstFlowReturn ret = GST_FLOW_OK; GError *err = NULL; + g_return_val_if_fail (G_IS_INPUT_STREAM (src->stream), GST_FLOW_ERROR); + if (G_UNLIKELY (offset != src->position)) { if (!GST_GIO_STREAM_IS_SEEKABLE (src->stream)) return GST_FLOW_NOT_SUPPORTED; @@ -347,17 +358,10 @@ gst_gio_base_src_set_stream (GstGioBaseSrc * src, GInputStream * stream) g_return_if_fail ((GST_STATE (src) != GST_STATE_PLAYING && GST_STATE (src) != GST_STATE_PAUSED)); - if (src->stream) { - gboolean success; - GError *err = NULL; - - success = g_input_stream_close (src->stream, src->cancel, &err); - - if (!success && !gst_gio_error (src, "g_input_stream_close", &err, NULL)) { - GST_WARNING_OBJECT (src, "g_input_stream_close failed: %s", err->message); - g_clear_error (&err); - } - + if (G_IS_INPUT_STREAM (src->stream)) { + GST_DEBUG_OBJECT (src, "closing old stream"); + g_input_stream_close_async (src->stream, 0, src->cancel, close_stream_cb, + g_object_ref (src)); g_object_unref (src->stream); src->stream = NULL; }