From 422044244177f23a6351837170863a02305341c2 Mon Sep 17 00:00:00 2001 From: Ognyan Tonchev Date: Sun, 2 Mar 2014 11:58:58 +0100 Subject: [PATCH] rtspconnection: Call closed() when GET is closed in tunneled mode This patch adds read source on the write socket in tunneled mode and we get a callback when client disconnects the GET channel. Fixes https://bugzilla.gnome.org/show_bug.cgi?id=725313 --- gst-libs/gst/rtsp/gstrtspconnection.c | 109 ++++++++++++++++++++++++++ tests/check/libs/rtspconnection.c | 10 ++- 2 files changed, 118 insertions(+), 1 deletion(-) diff --git a/gst-libs/gst/rtsp/gstrtspconnection.c b/gst-libs/gst/rtsp/gstrtspconnection.c index 326d38410c..13ca1d747a 100644 --- a/gst-libs/gst/rtsp/gstrtspconnection.c +++ b/gst-libs/gst/rtsp/gstrtspconnection.c @@ -120,6 +120,9 @@ struct _GstRTSPConnection GInputStream *input_stream; GOutputStream *output_stream; + /* this is a read source we add on the write socket in tunneled mode to be + * able to detect when client disconnects the GET channel */ + GInputStream *control_stream; /* connection state */ GSocket *read_socket; @@ -388,6 +391,7 @@ gst_rtsp_connection_create_from_socket (GSocket * socket, const gchar * ip, newconn->write_socket = newconn->read_socket = newconn->socket0; newconn->input_stream = g_io_stream_get_input_stream (stream); newconn->output_stream = g_io_stream_get_output_stream (stream); + newconn->control_stream = NULL; newconn->remote_ip = g_strdup (ip); newconn->local_ip = local_ip; newconn->initial_buffer = g_strdup (initial_buffer); @@ -715,6 +719,7 @@ setup_tunneling (GstRTSPConnection * conn, GTimeVal * timeout, gchar * uri) conn->socket1 = socket; conn->write_socket = conn->socket1; conn->output_stream = g_io_stream_get_output_stream (conn->stream1); + conn->control_stream = NULL; /* create the POST request for the write connection */ GST_RTSP_CHECK (gst_rtsp_message_new_request (&msg, GST_RTSP_POST, uri), @@ -856,6 +861,7 @@ gst_rtsp_connection_connect (GstRTSPConnection * conn, GTimeVal * timeout) conn->write_socket = conn->socket0; conn->input_stream = g_io_stream_get_input_stream (conn->stream0); conn->output_stream = g_io_stream_get_output_stream (conn->stream0); + conn->control_stream = NULL; if (conn->tunneled) { res = setup_tunneling (conn, timeout, uri); @@ -2885,6 +2891,7 @@ gst_rtsp_connection_do_tunnel (GstRTSPConnection * conn, conn->socket1 = conn2->socket0; conn->stream1 = conn2->stream0; conn->input_stream = conn2->input_stream; + conn->control_stream = g_io_stream_get_input_stream (conn->stream0); /* clean up some of the state of conn2 */ g_cancellable_cancel (conn2->cancellable); @@ -2893,6 +2900,7 @@ gst_rtsp_connection_do_tunnel (GstRTSPConnection * conn, conn2->stream0 = NULL; conn2->input_stream = NULL; conn2->output_stream = NULL; + conn2->control_stream = NULL; g_cancellable_reset (conn2->cancellable); /* We make socket0 the write socket and socket1 the read socket. */ @@ -2976,6 +2984,7 @@ struct _GstRTSPWatch GSource *readsrc; GSource *writesrc; + GSource *controlsrc; gboolean keep_running; @@ -3016,6 +3025,62 @@ gst_rtsp_source_check (GSource * source) return FALSE; } +static gboolean +gst_rtsp_source_dispatch_read_get_channel (GPollableInputStream * stream, + GstRTSPWatch * watch) +{ + gssize count; + guint8 buffer[1024]; + GError *error = NULL; + + /* try to read in order to be able to detect errors, we read 1k in case some + * client actually decides to send data on the GET channel */ + count = g_pollable_input_stream_read_nonblocking (stream, buffer, 1024, NULL, + &error); + if (count == 0) { + /* other end closed the socket */ + goto eof; + } + + if (count < 0) { + GST_DEBUG ("%s", error->message); + if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK) || + g_error_matches (error, G_IO_ERROR, G_IO_ERROR_TIMED_OUT)) { + g_clear_error (&error); + goto done; + } + g_clear_error (&error); + goto read_error; + } + + /* client sent data on the GET channel, ignore it */ + +done: + return TRUE; + + /* ERRORS */ +eof: + { + if (watch->funcs.closed) + watch->funcs.closed (watch, watch->user_data); + + /* the read connection was closed, stop the watch now */ + watch->keep_running = FALSE; + + return FALSE; + } +read_error: + { + if (watch->funcs.error_full) + watch->funcs.error_full (watch, GST_RTSP_ESYS, &watch->message, + 0, watch->user_data); + else if (watch->funcs.error) + watch->funcs.error (watch, GST_RTSP_ESYS, watch->user_data); + + goto eof; + } +} + static gboolean gst_rtsp_source_dispatch_read (GPollableInputStream * stream, GstRTSPWatch * watch) @@ -3184,6 +3249,21 @@ gst_rtsp_source_dispatch_write (GPollableOutputStream * stream, watch->writesrc = NULL; /* we create and add the write source again when we actually have * something to write */ + + /* since write source is now removed we add read source on the write + * socket instead to be able to detect when client closes get channel + * in tunneled mode */ + if (watch->conn->control_stream) { + watch->controlsrc = + g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM + (watch->conn->control_stream), NULL); + g_source_set_callback (watch->controlsrc, + (GSourceFunc) gst_rtsp_source_dispatch_read_get_channel, watch, + NULL); + g_source_add_child_source ((GSource *) watch, watch->controlsrc); + } else { + watch->controlsrc = NULL; + } } break; } @@ -3264,6 +3344,8 @@ gst_rtsp_source_finalize (GSource * source) g_source_unref (watch->readsrc); if (watch->writesrc) g_source_unref (watch->writesrc); + if (watch->controlsrc) + g_source_unref (watch->controlsrc); g_mutex_clear (&watch->mutex); @@ -3347,6 +3429,11 @@ gst_rtsp_watch_reset (GstRTSPWatch * watch) g_source_unref (watch->writesrc); watch->writesrc = NULL; } + if (watch->controlsrc) { + g_source_remove_child_source ((GSource *) watch, watch->controlsrc); + g_source_unref (watch->controlsrc); + watch->controlsrc = NULL; + } if (watch->conn->input_stream) { watch->readsrc = @@ -3361,6 +3448,20 @@ gst_rtsp_watch_reset (GstRTSPWatch * watch) /* we create and add the write source when we actually have something to * write */ + + /* when write source is not added we add read source on the write socket + * instead to be able to detect when client closes get channel in tunneled + * mode */ + if (watch->conn->control_stream) { + watch->controlsrc = + g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM + (watch->conn->control_stream), NULL); + g_source_set_callback (watch->controlsrc, + (GSourceFunc) gst_rtsp_source_dispatch_read_get_channel, watch, NULL); + g_source_add_child_source ((GSource *) watch, watch->controlsrc); + } else { + watch->controlsrc = NULL; + } } /** @@ -3530,6 +3631,14 @@ gst_rtsp_watch_write_data (GstRTSPWatch * watch, const guint8 * data, * socket */ context = ((GSource *) watch)->context; if (!watch->writesrc) { + /* remove the read source on the write socket, we will be able to detect + * errors while writing */ + if (watch->controlsrc) { + g_source_remove_child_source ((GSource *) watch, watch->controlsrc); + g_source_unref (watch->controlsrc); + watch->controlsrc = NULL; + } + watch->writesrc = g_pollable_output_stream_create_source (G_POLLABLE_OUTPUT_STREAM (watch->conn->output_stream), NULL); diff --git a/tests/check/libs/rtspconnection.c b/tests/check/libs/rtspconnection.c index f41277e0b4..f874c2708b 100644 --- a/tests/check/libs/rtspconnection.c +++ b/tests/check/libs/rtspconnection.c @@ -330,10 +330,18 @@ GST_START_TEST (test_rtspconnection_tunnel_setup) gst_rtsp_connection_free (rtsp_conn2); rtsp_conn2 = NULL; + /* check if rtspconnection can detect close of the get channel */ + g_object_unref (client_get); + while (!g_main_context_iteration (NULL, TRUE)); + fail_unless (tunnel_start_count == 1); + fail_unless (tunnel_complete_count == 2); + fail_unless (tunnel_lost_count == 1); + fail_unless (closed_count == 1); + fail_unless (gst_rtsp_connection_close (rtsp_conn1) == GST_RTSP_OK); fail_unless (gst_rtsp_connection_free (rtsp_conn1) == GST_RTSP_OK); + g_object_unref (client_post); - g_object_unref (client_get); g_object_unref (server_post); g_object_unref (server_get); }