socketsrc: Refactor to simplify

* Don't bother polling, just do a blocking read, the `GCancellable` will
  take care of unlocking.  This should also be faster on MS Windows where
  the GIO documentation for `g_socket_get_available_bytes` states: "Note
  that on Windows, this function is rather inefficient in the UDP case".

* Implement `GstPushSrc.fill` rather than `GstPushSrc.create`.  This means
  that we will be using the downstream allocator which may be more
  efficient.  It also means that socketsrc is likely to respect its
  "blocksize" property (assuming that there is enough data available).

See https://bugzilla.gnome.org/show_bug.cgi?id=739546
This commit is contained in:
William Manley 2015-03-13 13:30:48 +00:00 committed by Wim Taymans
parent 7c10499ecd
commit 0c054aa00d

View file

@ -76,8 +76,8 @@ G_DEFINE_TYPE (GstSocketSrc, gst_socket_src, GST_TYPE_PUSH_SRC);
static void gst_socket_src_finalize (GObject * gobject);
static GstFlowReturn gst_socket_src_create (GstPushSrc * psrc,
GstBuffer ** outbuf);
static GstFlowReturn gst_socket_src_fill (GstPushSrc * psrc,
GstBuffer * outbuf);
static gboolean gst_socket_src_unlock (GstBaseSrc * bsrc);
static gboolean gst_socket_src_unlock_stop (GstBaseSrc * bsrc);
@ -120,7 +120,7 @@ gst_socket_src_class_init (GstSocketSrcClass * klass)
gstbasesrc_class->unlock = gst_socket_src_unlock;
gstbasesrc_class->unlock_stop = gst_socket_src_unlock_stop;
gstpush_src_class->create = gst_socket_src_create;
gstpush_src_class->fill = gst_socket_src_fill;
GST_DEBUG_CATEGORY_INIT (socketsrc_debug, "socketsrc", 0, "Socket Source");
}
@ -148,14 +148,13 @@ gst_socket_src_finalize (GObject * gobject)
}
static GstFlowReturn
gst_socket_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
gst_socket_src_fill (GstPushSrc * psrc, GstBuffer * outbuf)
{
GstSocketSrc *src;
GstFlowReturn ret = GST_FLOW_OK;
gssize rret;
GError *err = NULL;
GstMapInfo map;
gssize avail, read;
GSocket *socket;
src = GST_SOCKET_SRC (psrc);
@ -173,60 +172,14 @@ gst_socket_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
GST_LOG_OBJECT (src, "asked for a buffer");
/* read the buffer header */
avail = g_socket_get_available_bytes (socket);
if (avail < 0) {
goto get_available_error;
} else if (avail == 0) {
GIOCondition condition;
if (!g_socket_condition_wait (socket,
G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP, src->cancellable, &err))
goto select_error;
condition =
g_socket_condition_check (socket,
G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP);
if ((condition & G_IO_ERR)) {
GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
("Socket in error state"));
*outbuf = NULL;
ret = GST_FLOW_ERROR;
goto done;
} else if ((condition & G_IO_HUP)) {
GST_DEBUG_OBJECT (src, "Connection closed");
*outbuf = NULL;
ret = GST_FLOW_EOS;
goto done;
}
avail = g_socket_get_available_bytes (socket);
if (avail < 0)
goto get_available_error;
}
if (avail > 0) {
read = MIN (avail, MAX_READ_SIZE);
*outbuf = gst_buffer_new_and_alloc (read);
gst_buffer_map (*outbuf, &map, GST_MAP_READWRITE);
rret =
g_socket_receive (socket, (gchar *) map.data, read,
src->cancellable, &err);
} else {
/* Connection closed */
*outbuf = NULL;
read = 0;
rret = 0;
}
gst_buffer_map (outbuf, &map, GST_MAP_READWRITE);
rret = g_socket_receive_with_blocking (socket, (gchar *) map.data,
map.size, TRUE, src->cancellable, &err);
gst_buffer_unmap (outbuf, &map);
if (rret == 0) {
GST_DEBUG_OBJECT (src, "Connection closed");
ret = GST_FLOW_EOS;
if (*outbuf) {
gst_buffer_unmap (*outbuf, &map);
gst_buffer_unref (*outbuf);
}
*outbuf = NULL;
} else if (rret < 0) {
if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
ret = GST_FLOW_FLUSHING;
@ -236,50 +189,24 @@ gst_socket_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
("Failed to read from socket: %s", err->message));
}
gst_buffer_unmap (*outbuf, &map);
gst_buffer_unref (*outbuf);
*outbuf = NULL;
} else {
ret = GST_FLOW_OK;
gst_buffer_unmap (*outbuf, &map);
gst_buffer_resize (*outbuf, 0, rret);
gst_buffer_resize (outbuf, 0, rret);
GST_LOG_OBJECT (src,
"Returning buffer from _get of size %" G_GSIZE_FORMAT ", ts %"
GST_TIME_FORMAT ", dur %" GST_TIME_FORMAT
", offset %" G_GINT64_FORMAT ", offset_end %" G_GINT64_FORMAT,
gst_buffer_get_size (*outbuf),
GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (*outbuf)),
GST_TIME_ARGS (GST_BUFFER_DURATION (*outbuf)),
GST_BUFFER_OFFSET (*outbuf), GST_BUFFER_OFFSET_END (*outbuf));
gst_buffer_get_size (outbuf),
GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (outbuf)),
GST_TIME_ARGS (GST_BUFFER_DURATION (outbuf)),
GST_BUFFER_OFFSET (outbuf), GST_BUFFER_OFFSET_END (outbuf));
}
g_clear_error (&err);
done:
g_object_unref (socket);
return ret;
select_error:
{
if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
GST_DEBUG_OBJECT (src, "Cancelled");
ret = GST_FLOW_FLUSHING;
} else {
GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
("Select failed: %s", err->message));
ret = GST_FLOW_ERROR;
}
g_clear_error (&err);
g_object_unref (socket);
return ret;
}
get_available_error:
{
GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
("Failed to get available bytes from socket"));
g_object_unref (socket);
return GST_FLOW_ERROR;
}
no_socket:
{
GST_ELEMENT_ERROR (src, RESOURCE, NOT_FOUND, (NULL),