rtspconnection: Handle closed POST socket in tunneling

Catch more socket errors.
Rework how sockets are managed in the GSource, wake up the maincontext instead
of adding/removing the sockets from the source.
Add callback for when the tunnel connection is lost. Some clients (Quicktime
Player) close the POST connection in tunneled mode and reopen the socket when
needed.

See #612915
This commit is contained in:
Wim Taymans 2010-04-06 10:55:42 +02:00
parent f2a0dd38b0
commit 318fbf3310
2 changed files with 100 additions and 45 deletions

View file

@ -1834,7 +1834,7 @@ normalize_line (guint8 * buffer)
/* returns:
* GST_RTSP_OK when a complete message was read.
* GST_RTSP_EEOF: when the socket is closed
* GST_RTSP_EEOF: when the read socket is closed
* GST_RTSP_EINTR: when more data is needed.
* GST_RTSP_..: some other error occured.
*/
@ -3006,8 +3006,10 @@ gst_rtsp_connection_do_tunnel (GstRTSPConnection * conn,
return GST_RTSP_OK;
}
#define READ_COND (G_IO_IN | G_IO_HUP | G_IO_ERR)
#define WRITE_COND (G_IO_OUT | G_IO_ERR)
#define READ_ERR (G_IO_HUP | G_IO_ERR | G_IO_NVAL)
#define READ_COND (G_IO_IN | READ_ERR)
#define WRITE_ERR (G_IO_HUP | G_IO_ERR | G_IO_NVAL)
#define WRITE_COND (G_IO_OUT | WRITE_ERR)
typedef struct
{
@ -3028,7 +3030,6 @@ struct _GstRTSPWatch
GPollFD readfd;
GPollFD writefd;
gboolean write_added;
/* queued message for transmission */
guint id;
@ -3077,17 +3078,35 @@ gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback G_GNUC_UNUSED,
gpointer user_data G_GNUC_UNUSED)
{
GstRTSPWatch *watch = (GstRTSPWatch *) source;
GstRTSPResult res;
GstRTSPResult res = GST_RTSP_ERROR;
gboolean keep_running = TRUE;
/* first read as much as we can */
if (watch->readfd.revents & READ_COND || watch->conn->initial_buffer != NULL) {
do {
if (watch->readfd.revents & READ_ERR)
goto read_error;
res = build_next (&watch->builder, &watch->message, watch->conn);
if (res == GST_RTSP_EINTR)
break;
else if (G_UNLIKELY (res == GST_RTSP_EEOF))
goto eof;
else if (G_LIKELY (res == GST_RTSP_OK)) {
else if (G_UNLIKELY (res == GST_RTSP_EEOF)) {
watch->readfd.events = 0;
watch->readfd.revents = 0;
g_source_remove_poll ((GSource *) watch, &watch->readfd);
/* When we are in tunnelled mode, the read socket can be closed and we
* should be prepared for a new POST method to reopen it */
if (watch->conn->tstate == TUNNEL_STATE_COMPLETE) {
/* remove the read connection for the tunnel */
/* we accept a new POST request */
watch->conn->tstate = TUNNEL_STATE_GET;
/* and signal that we lost our tunnel */
if (watch->funcs.tunnel_lost)
res = watch->funcs.tunnel_lost (watch, watch->user_data);
goto read_done;
} else
goto eof;
} else if (G_LIKELY (res == GST_RTSP_OK)) {
if (!watch->conn->manual_http &&
watch->message.type == GST_RTSP_MESSAGE_HTTP_REQUEST) {
if (watch->conn->tstate == TUNNEL_STATE_NONE &&
@ -3144,11 +3163,7 @@ gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback G_GNUC_UNUSED,
watch->funcs.message_received (watch, &watch->message,
watch->user_data);
} else {
if (watch->funcs.error_full)
GST_RTSP_CHECK (watch->funcs.error_full (watch, res, &watch->message,
0, watch->user_data), error);
else
goto error;
goto read_error;
}
read_done:
@ -3158,6 +3173,9 @@ gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback G_GNUC_UNUSED,
}
if (watch->writefd.revents & WRITE_COND) {
if (watch->writefd.revents & WRITE_ERR)
goto write_error;
g_mutex_lock (watch->mutex);
do {
if (watch->write_data == NULL) {
@ -3166,7 +3184,7 @@ gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback G_GNUC_UNUSED,
/* get a new message from the queue */
rec = g_queue_pop_tail (watch->messages);
if (rec == NULL)
goto done;
break;
watch->write_off = 0;
watch->write_data = rec->data;
@ -3179,17 +3197,14 @@ gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback G_GNUC_UNUSED,
res = write_bytes (watch->writefd.fd, watch->write_data,
&watch->write_off, watch->write_size);
g_mutex_unlock (watch->mutex);
if (res == GST_RTSP_EINTR)
goto write_blocked;
else if (G_LIKELY (res == GST_RTSP_OK)) {
if (watch->funcs.message_sent)
watch->funcs.message_sent (watch, watch->write_id, watch->user_data);
} else {
if (watch->funcs.error_full)
GST_RTSP_CHECK (watch->funcs.error_full (watch, res, NULL,
watch->write_id, watch->user_data), error);
else
goto error;
goto write_error;
}
g_mutex_lock (watch->mutex);
@ -3197,31 +3212,61 @@ gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback G_GNUC_UNUSED,
watch->write_data = NULL;
} while (TRUE);
done:
if (watch->write_added) {
g_source_remove_poll ((GSource *) watch, &watch->writefd);
watch->write_added = FALSE;
watch->writefd.revents = 0;
}
watch->writefd.events = WRITE_ERR;
g_mutex_unlock (watch->mutex);
}
write_blocked:
return TRUE;
return keep_running;
/* ERRORS */
eof:
{
if (watch->funcs.closed)
watch->funcs.closed (watch, watch->user_data);
/* always stop when the readfd returns EOF in non-tunneled mode */
return FALSE;
}
read_error:
{
watch->readfd.events = 0;
watch->readfd.revents = 0;
g_source_remove_poll ((GSource *) watch, &watch->readfd);
keep_running = (watch->writefd.events != 0);
if (keep_running) {
if (watch->funcs.error_full)
GST_RTSP_CHECK (watch->funcs.error_full (watch, res, &watch->message,
0, watch->user_data), error);
else
goto error;
} else
goto eof;
}
write_error:
{
watch->writefd.events = 0;
watch->writefd.revents = 0;
g_source_remove_poll ((GSource *) watch, &watch->writefd);
keep_running = (watch->readfd.events != 0);
if (keep_running) {
if (watch->funcs.error_full)
GST_RTSP_CHECK (watch->funcs.error_full (watch, res, NULL,
watch->write_id, watch->user_data), error);
else
goto error;
} else
goto eof;
}
error:
{
if (watch->funcs.error)
watch->funcs.error (watch, res, watch->user_data);
return FALSE;
return keep_running;
}
}
@ -3245,11 +3290,10 @@ gst_rtsp_source_finalize (GSource * source)
g_queue_foreach (watch->messages, (GFunc) gst_rtsp_rec_free, NULL);
g_queue_free (watch->messages);
watch->messages = NULL;
g_free (watch->write_data);
g_mutex_free (watch->mutex);
g_free (watch->write_data);
if (watch->notify)
watch->notify (watch->user_data);
}
@ -3312,10 +3356,6 @@ gst_rtsp_watch_new (GstRTSPConnection * conn,
result->user_data = user_data;
result->notify = notify;
/* only add the read fd, the write fd is only added when we have data
* to send. */
g_source_add_poll ((GSource *) result, &result->readfd);
return result;
}
@ -3341,11 +3381,13 @@ gst_rtsp_watch_reset (GstRTSPWatch * watch)
watch->readfd.revents = 0;
watch->writefd.fd = watch->conn->writefd->fd;
watch->writefd.events = WRITE_COND;
watch->writefd.events = WRITE_ERR;
watch->writefd.revents = 0;
watch->write_added = FALSE;
g_source_add_poll ((GSource *) watch, &watch->readfd);
if (watch->readfd.fd != -1)
g_source_add_poll ((GSource *) watch, &watch->readfd);
if (watch->writefd.fd != -1)
g_source_add_poll ((GSource *) watch, &watch->writefd);
}
/**
@ -3411,6 +3453,7 @@ gst_rtsp_watch_write_data (GstRTSPWatch * watch, const guint8 * data,
GstRTSPResult res;
GstRTSPRec *rec;
guint off = 0;
GMainContext *context = NULL;
g_return_val_if_fail (watch != NULL, GST_RTSP_EINVAL);
g_return_val_if_fail (data != NULL, GST_RTSP_EINVAL);
@ -3418,6 +3461,7 @@ gst_rtsp_watch_write_data (GstRTSPWatch * watch, const guint8 * data,
g_mutex_lock (watch->mutex);
/* try to send the message synchronously first */
if (watch->messages->length == 0) {
res = write_bytes (watch->writefd.fd, data, &off, size);
if (res != GST_RTSP_EINTR) {
@ -3428,7 +3472,7 @@ gst_rtsp_watch_write_data (GstRTSPWatch * watch, const guint8 * data,
}
}
/* make a record with the data and id */
/* make a record with the data and id for sending async */
rec = g_slice_new (GstRTSPRec);
if (off == 0) {
rec->data = (guint8 *) data;
@ -3449,9 +3493,9 @@ gst_rtsp_watch_write_data (GstRTSPWatch * watch, const guint8 * data,
/* make sure the main context will now also check for writability on the
* socket */
if (!watch->write_added) {
g_source_add_poll ((GSource *) watch, &watch->writefd);
watch->write_added = TRUE;
if (watch->writefd.events != WRITE_COND) {
watch->writefd.events = WRITE_COND;
context = ((GSource *) watch)->context;
}
if (id != NULL)
@ -3460,6 +3504,10 @@ gst_rtsp_watch_write_data (GstRTSPWatch * watch, const guint8 * data,
done:
g_mutex_unlock (watch->mutex);
if (context)
g_main_context_wakeup (context);
return res;
}
@ -3528,6 +3576,7 @@ gst_rtsp_watch_queue_data (GstRTSPWatch * watch, const guint8 * data,
guint size)
{
GstRTSPRec *rec;
GMainContext *context = NULL;
g_return_val_if_fail (watch != NULL, GST_RTSP_EINVAL);
g_return_val_if_fail (data != NULL, GST_RTSP_EINVAL);
@ -3549,12 +3598,15 @@ gst_rtsp_watch_queue_data (GstRTSPWatch * watch, const guint8 * data,
/* make sure the main context will now also check for writability on the
* socket */
if (!watch->write_added) {
g_source_add_poll ((GSource *) watch, &watch->writefd);
watch->write_added = TRUE;
if (watch->writefd.events != WRITE_COND) {
watch->writefd.events = WRITE_COND;
context = ((GSource *) watch)->context;
}
g_mutex_unlock (watch->mutex);
if (context)
g_main_context_wakeup (context);
return rec->id;
}
#endif /* GST_REMOVE_DEPRECATED */

View file

@ -153,6 +153,8 @@ typedef struct _GstRTSPWatch GstRTSPWatch;
* gst_rtsp_connection_do_tunnel().
* @error_full: callback when an error occured with more information than
* the @error callback. Since 0.10.25
* @tunnel_lost: callback when the post connection of a tunnel is closed.
* Since 0.10.29
*
* Callback functions from a #GstRTSPWatch.
*
@ -171,9 +173,10 @@ typedef struct {
GstRTSPResult (*error_full) (GstRTSPWatch *watch, GstRTSPResult result,
GstRTSPMessage *message, guint id,
gpointer user_data);
GstRTSPResult (*tunnel_lost) (GstRTSPWatch *watch, gpointer user_data);
/*< private >*/
gpointer _gst_reserved[GST_PADDING - 1];
gpointer _gst_reserved[GST_PADDING - 2];
} GstRTSPWatchFuncs;
GstRTSPWatch * gst_rtsp_watch_new (GstRTSPConnection *conn,