rtsp: check if the streams are still active

Don't try to read/write from an inactive stream. When we, for example,
transfer the second connection in tunneling mode, we are not interested anymore
on read/write activity on the old connection.
This commit is contained in:
Wim Taymans 2013-05-30 10:30:09 +02:00
parent d09028b4c3
commit 07babdd68a

View file

@ -2775,17 +2775,22 @@ gst_rtsp_source_dispatch_read (GPollableInputStream * stream,
GstRTSPWatch * watch) GstRTSPWatch * watch)
{ {
GstRTSPResult res = GST_RTSP_ERROR; GstRTSPResult res = GST_RTSP_ERROR;
GstRTSPConnection *conn = watch->conn;
res = build_next (&watch->builder, &watch->message, watch->conn, FALSE); /* if this connection was already closed, stop now */
if (G_POLLABLE_INPUT_STREAM (conn->input_stream) != stream)
goto eof;
res = build_next (&watch->builder, &watch->message, conn, FALSE);
if (res == GST_RTSP_EINTR) if (res == GST_RTSP_EINTR)
goto done; goto done;
else if (G_UNLIKELY (res == GST_RTSP_EEOF)) { else if (G_UNLIKELY (res == GST_RTSP_EEOF)) {
/* When we are in tunnelled mode, the read socket can be closed and we /* When we are in tunnelled mode, the read socket can be closed and we
* should be prepared for a new POST method to reopen it */ * should be prepared for a new POST method to reopen it */
if (watch->conn->tstate == TUNNEL_STATE_COMPLETE) { if (conn->tstate == TUNNEL_STATE_COMPLETE) {
/* remove the read connection for the tunnel */ /* remove the read connection for the tunnel */
/* we accept a new POST request */ /* we accept a new POST request */
watch->conn->tstate = TUNNEL_STATE_GET; conn->tstate = TUNNEL_STATE_GET;
/* and signal that we lost our tunnel */ /* and signal that we lost our tunnel */
if (watch->funcs.tunnel_lost) if (watch->funcs.tunnel_lost)
res = watch->funcs.tunnel_lost (watch, watch->user_data); res = watch->funcs.tunnel_lost (watch, watch->user_data);
@ -2793,14 +2798,14 @@ gst_rtsp_source_dispatch_read (GPollableInputStream * stream,
} else } else
goto eof; goto eof;
} else if (G_LIKELY (res == GST_RTSP_OK)) { } else if (G_LIKELY (res == GST_RTSP_OK)) {
if (!watch->conn->manual_http && if (!conn->manual_http &&
watch->message.type == GST_RTSP_MESSAGE_HTTP_REQUEST) { watch->message.type == GST_RTSP_MESSAGE_HTTP_REQUEST) {
if (watch->conn->tstate == TUNNEL_STATE_NONE && if (conn->tstate == TUNNEL_STATE_NONE &&
watch->message.type_data.request.method == GST_RTSP_GET) { watch->message.type_data.request.method == GST_RTSP_GET) {
GstRTSPMessage *response; GstRTSPMessage *response;
GstRTSPStatusCode code; GstRTSPStatusCode code;
watch->conn->tstate = TUNNEL_STATE_GET; conn->tstate = TUNNEL_STATE_GET;
if (watch->funcs.tunnel_start) if (watch->funcs.tunnel_start)
code = watch->funcs.tunnel_start (watch, watch->user_data); code = watch->funcs.tunnel_start (watch, watch->user_data);
@ -2808,13 +2813,13 @@ gst_rtsp_source_dispatch_read (GPollableInputStream * stream,
code = GST_RTSP_STS_OK; code = GST_RTSP_STS_OK;
/* queue the response */ /* queue the response */
response = gen_tunnel_reply (watch->conn, code, &watch->message); response = gen_tunnel_reply (conn, code, &watch->message);
gst_rtsp_watch_send_message (watch, response, NULL); gst_rtsp_watch_send_message (watch, response, NULL);
gst_rtsp_message_free (response); gst_rtsp_message_free (response);
goto read_done; goto read_done;
} else if (watch->conn->tstate == TUNNEL_STATE_NONE && } else if (conn->tstate == TUNNEL_STATE_NONE &&
watch->message.type_data.request.method == GST_RTSP_POST) { watch->message.type_data.request.method == GST_RTSP_POST) {
watch->conn->tstate = TUNNEL_STATE_POST; conn->tstate = TUNNEL_STATE_POST;
/* in the callback the connection should be tunneled with the /* in the callback the connection should be tunneled with the
* GET connection */ * GET connection */
@ -2827,7 +2832,7 @@ gst_rtsp_source_dispatch_read (GPollableInputStream * stream,
} else } else
goto read_error; goto read_error;
if (!watch->conn->manual_http) { if (!conn->manual_http) {
/* if manual HTTP support is not enabled, then restore the message to /* if manual HTTP support is not enabled, then restore the message to
* what it would have looked like without the support for parsing HTTP * what it would have looked like without the support for parsing HTTP
* messages being present */ * messages being present */
@ -2885,14 +2890,21 @@ static gboolean
gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback G_GNUC_UNUSED, gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback G_GNUC_UNUSED,
gpointer user_data G_GNUC_UNUSED) gpointer user_data G_GNUC_UNUSED)
{ {
return TRUE; GstRTSPWatch *watch = (GstRTSPWatch *) source;
return watch->keep_running;
} }
static gboolean static gboolean
gst_rtsp_source_dispatch_write (GPollableInputStream * stream, gst_rtsp_source_dispatch_write (GPollableOutputStream * stream,
GstRTSPWatch * watch) GstRTSPWatch * watch)
{ {
GstRTSPResult res = GST_RTSP_ERROR; GstRTSPResult res = GST_RTSP_ERROR;
GstRTSPConnection *conn = watch->conn;
/* if this connection was already closed, stop now */
if (G_POLLABLE_OUTPUT_STREAM (conn->output_stream) != stream)
goto eof;
g_mutex_lock (&watch->mutex); g_mutex_lock (&watch->mutex);
do { do {
@ -2914,8 +2926,8 @@ gst_rtsp_source_dispatch_write (GPollableInputStream * stream,
g_slice_free (GstRTSPRec, rec); g_slice_free (GstRTSPRec, rec);
} }
res = write_bytes (watch->conn->output_stream, watch->write_data, res = write_bytes (conn->output_stream, watch->write_data,
&watch->write_off, watch->write_size, FALSE, watch->conn->cancellable); &watch->write_off, watch->write_size, FALSE, conn->cancellable);
g_mutex_unlock (&watch->mutex); g_mutex_unlock (&watch->mutex);
if (res == GST_RTSP_EINTR) if (res == GST_RTSP_EINTR)