ext/gio/: Use async variants of the close stream functions to prevent blocking for a long time there and add some mor...

Original commit message from CVS:
* ext/gio/gstgiobasesink.c: (close_stream_cb),
(gst_gio_base_sink_stop), (gst_gio_base_sink_event),
(gst_gio_base_sink_render), (gst_gio_base_sink_set_stream):
* ext/gio/gstgiobasesrc.c: (close_stream_cb),
(gst_gio_base_src_stop), (gst_gio_base_src_create),
(gst_gio_base_src_set_stream):
Use async variants of the close stream functions to prevent blocking
for a long time there and add some more sanity checks for a correct
stream.
This commit is contained in:
Sebastian Dröge 2008-01-30 15:34:25 +00:00
parent de214f851c
commit 891e88c581
2 changed files with 93 additions and 86 deletions

View file

@ -128,32 +128,41 @@ gst_gio_base_sink_start (GstBaseSink * base_sink)
return TRUE; return TRUE;
} }
static void
close_stream_cb (GObject * object, GAsyncResult * res, gpointer user_data)
{
GstGioBaseSink *sink = GST_GIO_BASE_SINK (user_data);
gboolean success;
GError *err = NULL;
success = g_output_stream_close_finish (G_OUTPUT_STREAM (object), res, &err);
if (!success
&& !gst_gio_error (sink, "g_output_stream_close_async", &err, NULL)) {
GST_ELEMENT_ERROR (sink, RESOURCE, CLOSE, (NULL),
("g_output_stream_close_async failed: %s", err->message));
g_clear_error (&err);
}
GST_DEBUG_OBJECT (sink, "closed stream");
g_object_unref (sink);
}
static gboolean static gboolean
gst_gio_base_sink_stop (GstBaseSink * base_sink) gst_gio_base_sink_stop (GstBaseSink * base_sink)
{ {
GstGioBaseSink *sink = GST_GIO_BASE_SINK (base_sink); GstGioBaseSink *sink = GST_GIO_BASE_SINK (base_sink);
gboolean success = TRUE;
GError *err = NULL;
if (G_IS_OUTPUT_STREAM (sink->stream)) { if (G_IS_OUTPUT_STREAM (sink->stream)) {
/* FIXME: In case that the call below would block, there is no one to GST_DEBUG_OBJECT (sink, "closing stream");
* trigger the cancellation! */ g_output_stream_close_async (sink->stream, 0, sink->cancel, close_stream_cb,
g_object_ref (sink));
success = g_output_stream_close (sink->stream, sink->cancel, &err);
if (success) {
GST_DEBUG_OBJECT (sink, "closed stream");
} else if (!gst_gio_error (sink, "g_output_stream_close", &err, NULL)) {
GST_ELEMENT_ERROR (sink, RESOURCE, CLOSE, (NULL),
("g_output_stream_close failed: %s", err->message));
g_clear_error (&err);
}
g_object_unref (sink->stream); g_object_unref (sink->stream);
sink->stream = NULL; sink->stream = NULL;
} }
return success; return TRUE;
} }
static gboolean static gboolean
@ -191,45 +200,45 @@ gst_gio_base_sink_event (GstBaseSink * base_sink, GstEvent * event)
switch (GST_EVENT_TYPE (event)) { switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_NEWSEGMENT: case GST_EVENT_NEWSEGMENT:
{ if (G_IS_OUTPUT_STREAM (sink->stream)) {
GstFormat format; GstFormat format;
gint64 offset; gint64 offset;
gst_event_parse_new_segment (event, NULL, NULL, &format, &offset, NULL, gst_event_parse_new_segment (event, NULL, NULL, &format, &offset, NULL,
NULL); NULL);
if (format != GST_FORMAT_BYTES) { if (format != GST_FORMAT_BYTES) {
GST_WARNING_OBJECT (sink, "ignored NEWSEGMENT event in %s format", GST_WARNING_OBJECT (sink, "ignored NEWSEGMENT event in %s format",
gst_format_get_name (format)); gst_format_get_name (format));
break; break;
}
if (GST_GIO_STREAM_IS_SEEKABLE (sink->stream)) {
ret = gst_gio_seek (sink, G_SEEKABLE (sink->stream), offset,
sink->cancel);
if (ret == GST_FLOW_OK)
sink->position = offset;
} else {
ret = GST_FLOW_NOT_SUPPORTED;
}
} }
if (GST_GIO_STREAM_IS_SEEKABLE (sink->stream)) {
ret = gst_gio_seek (sink, G_SEEKABLE (sink->stream), offset,
sink->cancel);
if (ret == GST_FLOW_OK)
sink->position = offset;
} else {
ret = GST_FLOW_NOT_SUPPORTED;
}
}
break; break;
case GST_EVENT_EOS: case GST_EVENT_EOS:
case GST_EVENT_FLUSH_START: case GST_EVENT_FLUSH_START:
{ if (G_IS_OUTPUT_STREAM (sink->stream)) {
gboolean success; gboolean success;
GError *err = NULL; GError *err = NULL;
success = g_output_stream_flush (sink->stream, sink->cancel, &err); success = g_output_stream_flush (sink->stream, sink->cancel, &err);
if (!success && !gst_gio_error (sink, "g_output_stream_flush", &err, if (!success && !gst_gio_error (sink, "g_output_stream_flush", &err,
&ret)) { &ret)) {
GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, (NULL), GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, (NULL),
("flush failed: %s", err->message)); ("flush failed: %s", err->message));
g_clear_error (&err); g_clear_error (&err);
}
} }
}
break; break;
default: default:
@ -247,6 +256,8 @@ gst_gio_base_sink_render (GstBaseSink * base_sink, GstBuffer * buffer)
gboolean success; gboolean success;
GError *err = NULL; GError *err = NULL;
g_return_val_if_fail (G_IS_OUTPUT_STREAM (sink->stream), GST_FLOW_ERROR);
GST_LOG_OBJECT (sink, "writing %u bytes to offset %" G_GUINT64_FORMAT, GST_LOG_OBJECT (sink, "writing %u bytes to offset %" G_GUINT64_FORMAT,
GST_BUFFER_SIZE (buffer), sink->position); GST_BUFFER_SIZE (buffer), sink->position);
@ -314,18 +325,10 @@ gst_gio_base_sink_set_stream (GstGioBaseSink * sink, GOutputStream * stream)
g_return_if_fail ((GST_STATE (sink) != GST_STATE_PLAYING && g_return_if_fail ((GST_STATE (sink) != GST_STATE_PLAYING &&
GST_STATE (sink) != GST_STATE_PAUSED)); GST_STATE (sink) != GST_STATE_PAUSED));
if (sink->stream) { if (G_IS_OUTPUT_STREAM (sink->stream)) {
GError *err = NULL; GST_DEBUG_OBJECT (sink, "closing old stream");
gboolean success = g_output_stream_close (sink->stream, sink->cancel, &err); g_output_stream_close_async (sink->stream, 0, sink->cancel, close_stream_cb,
g_object_ref (sink));
if (success) {
GST_DEBUG_OBJECT (sink, "closed stream");
} else if (!gst_gio_error (sink, "g_output_stream_close", &err, NULL)) {
GST_WARNING_OBJECT (sink, "g_output_stream_close failed: %s",
err->message);
g_clear_error (&err);
}
g_object_unref (sink->stream); g_object_unref (sink->stream);
sink->stream = NULL; sink->stream = NULL;
} }

View file

@ -128,32 +128,41 @@ gst_gio_base_src_start (GstBaseSrc * base_src)
return TRUE; return TRUE;
} }
static gboolean static void
gst_gio_base_src_stop (GstBaseSrc * base_src) close_stream_cb (GObject * object, GAsyncResult * res, gpointer user_data)
{ {
GstGioBaseSrc *src = GST_GIO_BASE_SRC (base_src); GstGioBaseSrc *src = GST_GIO_BASE_SRC (user_data);
gboolean success = TRUE; gboolean success;
GError *err = NULL; GError *err = NULL;
if (src->stream != NULL) { success = g_input_stream_close_finish (G_INPUT_STREAM (object), res, &err);
/* FIXME: In case that the call below would block, there is no one to
* trigger the cancellation! */
success = g_input_stream_close (src->stream, src->cancel, &err); if (!success
&& !gst_gio_error (src, "g_input_stream_close_async", &err, NULL)) {
if (!success && !gst_gio_error (src, "g_input_stream_close", &err, NULL)) { GST_ELEMENT_ERROR (src, RESOURCE, CLOSE, (NULL),
GST_ELEMENT_ERROR (src, RESOURCE, CLOSE, (NULL), ("g_input_stream_close_async failed: %s", err->message));
("g_input_stream_close failed: %s", err->message)); g_clear_error (&err);
g_clear_error (&err);
}
g_object_unref (src->stream);
src->stream = NULL;
} }
GST_DEBUG_OBJECT (src, "closed stream"); GST_DEBUG_OBJECT (src, "closed stream");
return success; g_object_unref (src);
}
static gboolean
gst_gio_base_src_stop (GstBaseSrc * base_src)
{
GstGioBaseSrc *src = GST_GIO_BASE_SRC (base_src);
if (G_IS_INPUT_STREAM (src->stream)) {
GST_DEBUG_OBJECT (src, "closing stream");
g_input_stream_close_async (src->stream, 0, src->cancel, close_stream_cb,
g_object_ref (src));
g_object_unref (src->stream);
src->stream = NULL;
}
return TRUE;
} }
static gboolean static gboolean
@ -295,6 +304,8 @@ gst_gio_base_src_create (GstBaseSrc * base_src, guint64 offset, guint size,
GstFlowReturn ret = GST_FLOW_OK; GstFlowReturn ret = GST_FLOW_OK;
GError *err = NULL; GError *err = NULL;
g_return_val_if_fail (G_IS_INPUT_STREAM (src->stream), GST_FLOW_ERROR);
if (G_UNLIKELY (offset != src->position)) { if (G_UNLIKELY (offset != src->position)) {
if (!GST_GIO_STREAM_IS_SEEKABLE (src->stream)) if (!GST_GIO_STREAM_IS_SEEKABLE (src->stream))
return GST_FLOW_NOT_SUPPORTED; return GST_FLOW_NOT_SUPPORTED;
@ -347,17 +358,10 @@ gst_gio_base_src_set_stream (GstGioBaseSrc * src, GInputStream * stream)
g_return_if_fail ((GST_STATE (src) != GST_STATE_PLAYING && g_return_if_fail ((GST_STATE (src) != GST_STATE_PLAYING &&
GST_STATE (src) != GST_STATE_PAUSED)); GST_STATE (src) != GST_STATE_PAUSED));
if (src->stream) { if (G_IS_INPUT_STREAM (src->stream)) {
gboolean success; GST_DEBUG_OBJECT (src, "closing old stream");
GError *err = NULL; g_input_stream_close_async (src->stream, 0, src->cancel, close_stream_cb,
g_object_ref (src));
success = g_input_stream_close (src->stream, src->cancel, &err);
if (!success && !gst_gio_error (src, "g_input_stream_close", &err, NULL)) {
GST_WARNING_OBJECT (src, "g_input_stream_close failed: %s", err->message);
g_clear_error (&err);
}
g_object_unref (src->stream); g_object_unref (src->stream);
src->stream = NULL; src->stream = NULL;
} }