Implement zero-copy and make the buffer size configurable.

Original commit message from CVS:
Patch by: Wouter Cloetens <wouter at mind dot be>
* configure.ac:
* ext/soup/gstsouphttpsrc.c: (gst_soup_http_src_cancel_message),
(gst_soup_http_src_finished_cb), (gst_soup_http_src_chunk_free),
(gst_soup_http_src_chunk_allocator),
(gst_soup_http_src_got_chunk_cb), (gst_soup_http_src_create),
(gst_soup_http_src_start), (gst_soup_http_src_set_proxy):
* ext/soup/gstsouphttpsrc.h:
Implement zero-copy and make the buffer size configurable.
Prefix proxy URIs with "http://" if they don't start with it
already and catch errors earlier, fixes hanging in some situations.
Fixes bug #514948.
This commit is contained in:
Wouter Cloetens 2008-02-22 07:20:03 +00:00 committed by Sebastian Dröge
parent 2a424ead41
commit e63b9b56bf
3 changed files with 102 additions and 34 deletions

2
common

@ -1 +1 @@
Subproject commit 2a19465fdb43a75f4d32950fd2beb1beb950eec2
Subproject commit 135628f16d422584d3454fb9c9805e7be25760a1

View file

@ -124,7 +124,7 @@ enum
PROP_IRADIO_TITLE
};
#define DEFAULT_USER_AGENT "GStreamer souphttpsrc"
#define DEFAULT_USER_AGENT "GStreamer souphttpsrc "
static void gst_soup_http_src_uri_handler_init (gpointer g_iface,
gpointer iface_data);
@ -163,6 +163,9 @@ static void gst_soup_http_src_session_pause_message (GstSoupHTTPSrc * src);
static void gst_soup_http_src_session_close (GstSoupHTTPSrc * src);
static void gst_soup_http_src_parse_status (SoupMessage * msg,
GstSoupHTTPSrc * src);
static void gst_soup_http_src_chunk_free (gpointer gstbuf);
static SoupBuffer *gst_soup_http_src_chunk_allocator (SoupMessage * msg,
gsize max_len, gpointer user_data);
static void gst_soup_http_src_got_chunk_cb (SoupMessage * msg,
SoupBuffer * chunk, GstSoupHTTPSrc * src);
static void gst_soup_http_src_response_cb (SoupSession * session,
@ -459,7 +462,8 @@ gst_soup_http_src_unicodify (const gchar * str)
static void
gst_soup_http_src_cancel_message (GstSoupHTTPSrc * src)
{
soup_session_cancel_message (src->session, src->msg, SOUP_STATUS_CANCELLED);
if (src->msg != NULL)
soup_session_cancel_message (src->session, src->msg, SOUP_STATUS_CANCELLED);
src->session_io_status = GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_IDLE;
src->msg = NULL;
}
@ -645,25 +649,88 @@ gst_soup_http_src_finished_cb (SoupMessage * msg, GstSoupHTTPSrc * src)
GST_DEBUG_OBJECT (src, "finished, but not for current message");
return;
}
GST_DEBUG_OBJECT (src, "finished");
if (G_UNLIKELY (src->session_io_status !=
GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_RUNNING)) {
/* Probably a redirect. */
return;
GST_ELEMENT_ERROR (src, RESOURCE, NOT_FOUND,
("%s", msg->reason_phrase),
("libsoup status code %d", msg->status_code));
}
GST_DEBUG_OBJECT (src, "finished");
src->ret = GST_FLOW_UNEXPECTED;
if (src->loop)
g_main_loop_quit (src->loop);
}
/* Buffer lifecycle management.
*
* gst_soup_http_src_create() runs the GMainLoop for this element, to let
* Soup take control.
* A GstBuffer is allocated in gst_soup_http_src_chunk_allocator() and
* associated with a SoupBuffer.
* Soup reads HTTP data in the GstBuffer's data buffer.
* The gst_soup_http_src_got_chunk_cb() is then called with the SoupBuffer.
* That sets gst_soup_http_src_create()'s return argument to the GstBuffer,
* increments its refcount (to 2), pauses the flow of data from the HTTP
* source to prevent gst_soup_http_src_got_chunk_cb() from being called
* again and breaks out of the GMainLoop.
* Because the SOUP_MESSAGE_OVERWRITE_CHUNKS flag is set, Soup frees the
* SoupBuffer and calls gst_soup_http_src_chunk_free(), which decrements the
* refcount (to 1).
* gst_soup_http_src_create() returns the GstBuffer. It will be freed by a
* downstream element.
* If Soup fails to read HTTP data, it does not call
* gst_soup_http_src_got_chunk_cb(), but still frees the SoupBuffer and
* calls gst_soup_http_src_chunk_free(), which decrements the GstBuffer's
* refcount to 0, freeing it.
*/
static void
gst_soup_http_src_chunk_free (gpointer gstbuf)
{
gst_buffer_unref (GST_BUFFER_CAST (gstbuf));
}
static SoupBuffer *
gst_soup_http_src_chunk_allocator (SoupMessage * msg, gsize max_len,
gpointer user_data)
{
GstSoupHTTPSrc *src = (GstSoupHTTPSrc *) user_data;
GstBaseSrc *basesrc = GST_BASE_SRC_CAST (src);
GstBuffer *gstbuf;
SoupBuffer *soupbuf;
gsize length;
GstFlowReturn rc;
if (max_len)
length = MIN (basesrc->blocksize, max_len);
else
length = basesrc->blocksize;
GST_DEBUG_OBJECT (src, "alloc %" G_GSIZE_FORMAT " bytes <= %" G_GSIZE_FORMAT,
length, max_len);
rc = gst_pad_alloc_buffer (GST_BASE_SRC_PAD (basesrc),
GST_BUFFER_OFFSET_NONE, length,
GST_PAD_CAPS (GST_BASE_SRC_PAD (basesrc)), &gstbuf);
if (G_UNLIKELY (rc != GST_FLOW_OK)) {
/* Failed to allocate buffer. Stall SoupSession and return error code
* to create(). */
src->ret = rc;
g_main_loop_quit (src->loop);
return NULL;
}
soupbuf = soup_buffer_new_with_owner (GST_BUFFER_DATA (gstbuf), length,
gstbuf, gst_soup_http_src_chunk_free);
return soupbuf;
}
static void
gst_soup_http_src_got_chunk_cb (SoupMessage * msg, SoupBuffer * chunk,
GstSoupHTTPSrc * src)
{
GstBaseSrc *basesrc;
guint64 new_position;
const char *data;
gsize length;
if (G_UNLIKELY (msg != src->msg)) {
GST_DEBUG_OBJECT (src, "got chunk, but not for current message");
@ -675,22 +742,22 @@ gst_soup_http_src_got_chunk_cb (SoupMessage * msg, SoupBuffer * chunk,
return;
}
basesrc = GST_BASE_SRC_CAST (src);
data = chunk->data;
length = chunk->length;
GST_DEBUG_OBJECT (src, "got chunk of %" G_GSIZE_FORMAT " bytes", length);
GST_DEBUG_OBJECT (src, "got chunk of %" G_GSIZE_FORMAT " bytes",
chunk->length);
/* Create the buffer. */
src->ret = gst_pad_alloc_buffer (GST_BASE_SRC_PAD (basesrc),
basesrc->segment.last_stop, length,
GST_PAD_CAPS (GST_BASE_SRC_PAD (basesrc)), src->outbuf);
if (G_LIKELY (src->ret == GST_FLOW_OK)) {
memcpy (GST_BUFFER_DATA (*src->outbuf), data, length);
new_position = src->read_position + length;
if (G_LIKELY (src->request_position == src->read_position))
src->request_position = new_position;
src->read_position = new_position;
}
/* Extract the GstBuffer from the SoupBuffer and set its fields. */
*src->outbuf = GST_BUFFER_CAST (soup_buffer_get_owner (chunk));
gst_buffer_ref (*src->outbuf);
GST_BUFFER_SIZE (*src->outbuf) = chunk->length;
GST_BUFFER_OFFSET (*src->outbuf) = basesrc->segment.last_stop;
gst_buffer_set_caps (*src->outbuf, GST_PAD_CAPS (GST_BASE_SRC_PAD (basesrc)));
new_position = src->read_position + chunk->length;
if (G_LIKELY (src->request_position == src->read_position))
src->request_position = new_position;
src->read_position = new_position;
src->ret = GST_FLOW_OK;
g_main_loop_quit (src->loop);
gst_soup_http_src_session_pause_message (src);
}
@ -789,10 +856,6 @@ gst_soup_http_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
src->session_io_status = GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_IDLE;
soup_message_headers_append (src->msg->request_headers, "Connection",
"close");
if (src->user_agent) {
soup_message_headers_append (src->msg->request_headers, "User-Agent",
src->user_agent);
}
if (src->iradio_mode) {
soup_message_headers_append (src->msg->request_headers, "icy-metadata",
"1");
@ -808,6 +871,8 @@ gst_soup_http_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
G_CALLBACK (gst_soup_http_src_got_chunk_cb), src);
soup_message_set_flags (src->msg, SOUP_MESSAGE_OVERWRITE_CHUNKS |
(src->automatic_redirect ? 0 : SOUP_MESSAGE_NO_REDIRECT));
soup_message_set_chunk_allocator (src->msg,
gst_soup_http_src_chunk_allocator, src, NULL);
gst_soup_http_src_add_range_header (src, src->request_position);
}
@ -828,10 +893,6 @@ gst_soup_http_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
GST_DEBUG_OBJECT (src, "Queueing connection request");
gst_soup_http_src_queue_message (src);
break;
case GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_FINISHED:
GST_DEBUG_OBJECT (src, "Connection closed");
gst_soup_http_src_cancel_message (src);
break;
case GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_QUEUED:
break;
case GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_RUNNING:
@ -874,11 +935,12 @@ gst_soup_http_src_start (GstBaseSrc * bsrc)
if (src->proxy == NULL)
src->session =
soup_session_async_new_with_options (SOUP_SESSION_ASYNC_CONTEXT,
src->context, NULL);
src->context, SOUP_SESSION_USER_AGENT, src->user_agent, NULL);
else
src->session =
soup_session_async_new_with_options (SOUP_SESSION_ASYNC_CONTEXT,
src->context, SOUP_SESSION_PROXY_URI, src->proxy, NULL);
src->context, SOUP_SESSION_PROXY_URI, src->proxy,
SOUP_SESSION_USER_AGENT, src->user_agent, NULL);
if (!src->session) {
GST_ELEMENT_ERROR (src, LIBRARY, INIT,
(NULL), ("Failed to create async session"));
@ -996,7 +1058,14 @@ gst_soup_http_src_set_proxy (GstSoupHTTPSrc * src, const gchar * uri)
soup_uri_free (src->proxy);
src->proxy = NULL;
}
src->proxy = soup_uri_new (uri);
if (g_str_has_prefix (uri, "http://")) {
src->proxy = soup_uri_new (uri);
} else {
gchar *new_uri = g_strconcat ("http://", uri, NULL);
src->proxy = soup_uri_new (new_uri);
g_free (new_uri);
}
return TRUE;
}

View file

@ -42,7 +42,6 @@ typedef enum {
GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_IDLE,
GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_QUEUED,
GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_RUNNING,
GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_FINISHED,
} GstSoupHTTPSrcSessionIOStatus;
struct _GstSoupHTTPSrc {