Removed not need thread and changed to push out blocksize bytes.

Original commit message from CVS:
Removed not need thread and changed to push out blocksize bytes.
This commit is contained in:
Edgard Lima 2006-01-04 13:26:35 +00:00
parent 79b2f35fdc
commit 565a17b6a4
4 changed files with 80 additions and 138 deletions

View file

@ -1,3 +1,9 @@
2006-01-04 Edgard Lima <edgard.lima@indt.org.br>
* ext/neon/gstneonhttpsrc.c:
* ext/neon/gstneonhttpsrc.h:
Removed not need thread and changed to push out blocksize bytes.
2005-12-28 Edgard Lima <edgard.lima@indt.org.br>
* configure.ac:

2
common

@ -1 +1 @@
Subproject commit d1911d4b3d6267f9cd9dfb68fcef2afe4d098092
Subproject commit 5f10c872cafb3eb8058d63e438cae029ed9e8d73

View file

@ -49,11 +49,8 @@ enum
PROP_PROXY
};
static void request_dispatch (void *data);
static void oom_callback ();
static int accept_response (void *userdata, ne_request * req,
const ne_status * st);
static void block_reader (void *userdata, const char *buf, size_t len);
static void size_header_handler (void *userdata, const char *value);
static gboolean set_proxy (const char *uri, ne_uri * parsed,
@ -165,11 +162,7 @@ gst_neonhttp_src_init (GstNeonhttpSrc * this, GstNeonhttpSrcClass * g_class)
set_uri (NULL, &this->uri, &this->ishttps, &this->uristr, TRUE);
set_proxy (NULL, &this->proxy, TRUE);
this->lock = g_mutex_new ();
this->adapter = gst_adapter_new ();
this->task = gst_task_create (request_dispatch, this);
g_static_rec_mutex_init (&this->tasklock);
gst_task_set_lock (this->task, &this->tasklock);
gst_base_src_set_live (GST_BASE_SRC (this), TRUE);
@ -200,22 +193,80 @@ gst_neonhttp_src_finalize (GObject * gobject)
g_object_unref (this->adapter);
}
if (this->lock) {
g_mutex_free (this->lock);
if (this->uristr) {
ne_free (this->uristr);
}
gst_object_unref (this->task);
}
g_static_rec_mutex_free (&this->tasklock);
int
request_dispatch (GstNeonhttpSrc * src, GstBuffer * outbuf)
{
GstPad *peer;
int ret;
int read = 0;
int sizetoread = GST_BUFFER_SIZE (outbuf);
/* Loop sending the request:
* Retry whilst authentication fails and we supply it. */
ssize_t len = 0;
while (sizetoread > 0) {
if (!GST_OBJECT_FLAG_IS_SET (src, GST_NEONHTTP_SRC_OPEN)) {
GST_BUFFER_SIZE (outbuf) = read;
return read;
}
len = ne_read_response_block (src->request,
(char *) GST_BUFFER_DATA (outbuf) + read, sizetoread);
if (len > 0) {
read += len;
sizetoread -= len;
} else {
break;
}
}
GST_BUFFER_SIZE (outbuf) = read;
if (len < 0) {
read = -2;
goto done;
} else if (len == 0) {
ret = ne_end_request (src->request);
if (ret != NE_RETRY) {
if (ret == NE_OK) {
GST_DEBUG ("Returning EOS");
peer = gst_pad_get_peer (GST_BASE_SRC_PAD (src));
if (!gst_pad_send_event (peer, gst_event_new_eos ())) {
ret = GST_FLOW_ERROR;
}
gst_object_unref (peer);
} else {
read = -3;
GST_ERROR ("Request failed. code:%d, desc: %s\n", ret,
ne_get_error (src->session));
}
}
goto done;
}
done:
return read;
}
static GstFlowReturn
gst_neonhttp_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
{
GstNeonhttpSrc *src;
GstFlowReturn ret = GST_FLOW_OK;
guint avail;
int read;
src = GST_NEONHTTP_SRC (psrc);
@ -224,46 +275,19 @@ gst_neonhttp_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
GST_LOG_OBJECT (src, "asked for a buffer");
while (1) {
g_mutex_lock (src->lock);
if ((avail = gst_adapter_available (src->adapter))) {
g_mutex_unlock (src->lock);
break;
} else if (src->eos) {
GstPad *peer;
*outbuf = gst_buffer_new_and_alloc (GST_BASE_SRC (psrc)->blocksize);
g_mutex_unlock (src->lock);
read = request_dispatch (src, *outbuf);
if (read > 0) {
*outbuf = NULL;
GST_DEBUG ("Returning EOS");
peer = gst_pad_get_peer (GST_BASE_SRC_PAD (src));
if (!gst_pad_send_event (peer, gst_event_new_eos ())) {
ret = GST_FLOW_ERROR;
}
gst_object_unref (peer);
goto done;
if (*outbuf) {
gst_buffer_set_caps (*outbuf, GST_PAD_CAPS (GST_BASE_SRC_PAD (src)));
}
g_mutex_unlock (src->lock);
usleep (250000);
} else if (read < 0) {
return GST_FLOW_ERROR;
}
g_mutex_lock (src->lock);
avail = gst_adapter_available (src->adapter);
avail = avail > (4 * 1024) ? (4 * 1024) : avail;
*outbuf = gst_buffer_new_and_alloc (avail);
memcpy (GST_BUFFER_DATA (*outbuf), gst_adapter_peek (src->adapter, avail),
avail);
gst_adapter_flush (src->adapter, avail);
g_mutex_unlock (src->lock);
if (*outbuf) {
gst_buffer_set_caps (*outbuf, GST_PAD_CAPS (GST_BASE_SRC_PAD (src)));
}
done:
return ret;
wrong_state:
@ -516,17 +540,12 @@ gst_neonhttp_src_start (GstBaseSrc * bsrc)
ne_add_response_header_handler (src->request, "Content-Length",
size_header_handler, src);
ne_add_response_body_reader (src->request, accept_response, block_reader,
src);
if (NE_OK != ne_begin_request (src->request)) {
ret = FALSE;
goto done;
}
src->eos = FALSE;
GST_OBJECT_FLAG_SET (src, GST_NEONHTTP_SRC_OPEN);
gst_task_start (src->task);
done:
@ -546,9 +565,6 @@ gst_neonhttp_src_stop (GstBaseSrc * bsrc)
GST_OBJECT_FLAG_UNSET (src, GST_NEONHTTP_SRC_OPEN);
gst_task_stop (src->task);
gst_task_join (src->task);
if (src->request) {
ne_request_destroy (src->request);
src->request = NULL;
@ -563,54 +579,6 @@ gst_neonhttp_src_stop (GstBaseSrc * bsrc)
return TRUE;
}
void
request_dispatch (void *data)
{
int ret;
GstNeonhttpSrc *src;
src = GST_NEONHTTP_SRC (data);
/* Loop sending the request:
* Retry whilst authentication fails and we supply it. */
do {
ssize_t len;
do {
if (!GST_OBJECT_FLAG_IS_SET (src, GST_NEONHTTP_SRC_OPEN)) {
return;
}
len = ne_read_response_block (src->request,
src->respbuf, sizeof (src->respbuf));
} while (len > 0);
if (len < 0) {
ret = NE_ERROR;
break;
}
ret = ne_end_request (src->request);
} while (ret == NE_RETRY);
if (ret != NE_OK) {
GST_ERROR ("Request failed. code:%d, desc: %s\n", ret,
ne_get_error (src->session));
}
g_mutex_lock (src->lock);
src->eos = TRUE;
g_mutex_unlock (src->lock);
gst_task_stop (src->task);
return;
}
/* entry point to initialize the plug-in
* initialize the plug-in itself
* register the element factories and pad templates
@ -682,39 +650,11 @@ oom_callback ()
GST_ERROR ("memory exeception in neon\n");
}
static int
accept_response (void *userdata, ne_request * req, const ne_status * st)
{
GST_LOG ("ne_accept_response called code = %d phrase %s\n", st->code,
st->reason_phrase);
return ne_accept_2xx (userdata, req, st);
}
static void
block_reader (void *userdata, const char *buf, size_t len)
{
if (len) {
GstNeonhttpSrc *src = GST_NEONHTTP_SRC (userdata);
GstBuffer *buffer = gst_buffer_new_and_alloc (len);
memcpy (GST_BUFFER_DATA (buffer), buf, len);
g_mutex_lock (src->lock);
gst_adapter_push (src->adapter, buffer);
src->current_size += len;
g_mutex_unlock (src->lock);
}
}
void
size_header_handler (void *userdata, const char *value)
{
GstNeonhttpSrc *src = GST_NEONHTTP_SRC (userdata);
g_mutex_lock (src->lock);
src->content_size = atoi (value);
g_mutex_unlock (src->lock);
}

View file

@ -62,12 +62,8 @@ struct _GstNeonhttpSrc {
gint current_size;
GstAdapter *adapter;
GMutex *lock;
GstTask *task;
GStaticRecMutex tasklock;
gboolean eos;
char respbuf[BUFSIZ];
gboolean eos;
};