rtspconnection: Call closed() when GET is closed in tunneled mode

This patch adds read source on the write socket in tunneled
mode and we get a callback when client disconnects the GET
channel.

Fixes https://bugzilla.gnome.org/show_bug.cgi?id=725313
This commit is contained in:
Ognyan Tonchev 2014-03-02 11:58:58 +01:00 committed by Wim Taymans
parent 900c204eb9
commit 4220442441
2 changed files with 118 additions and 1 deletions

View file

@ -120,6 +120,9 @@ struct _GstRTSPConnection
GInputStream *input_stream;
GOutputStream *output_stream;
/* this is a read source we add on the write socket in tunneled mode to be
* able to detect when client disconnects the GET channel */
GInputStream *control_stream;
/* connection state */
GSocket *read_socket;
@ -388,6 +391,7 @@ gst_rtsp_connection_create_from_socket (GSocket * socket, const gchar * ip,
newconn->write_socket = newconn->read_socket = newconn->socket0;
newconn->input_stream = g_io_stream_get_input_stream (stream);
newconn->output_stream = g_io_stream_get_output_stream (stream);
newconn->control_stream = NULL;
newconn->remote_ip = g_strdup (ip);
newconn->local_ip = local_ip;
newconn->initial_buffer = g_strdup (initial_buffer);
@ -715,6 +719,7 @@ setup_tunneling (GstRTSPConnection * conn, GTimeVal * timeout, gchar * uri)
conn->socket1 = socket;
conn->write_socket = conn->socket1;
conn->output_stream = g_io_stream_get_output_stream (conn->stream1);
conn->control_stream = NULL;
/* create the POST request for the write connection */
GST_RTSP_CHECK (gst_rtsp_message_new_request (&msg, GST_RTSP_POST, uri),
@ -856,6 +861,7 @@ gst_rtsp_connection_connect (GstRTSPConnection * conn, GTimeVal * timeout)
conn->write_socket = conn->socket0;
conn->input_stream = g_io_stream_get_input_stream (conn->stream0);
conn->output_stream = g_io_stream_get_output_stream (conn->stream0);
conn->control_stream = NULL;
if (conn->tunneled) {
res = setup_tunneling (conn, timeout, uri);
@ -2885,6 +2891,7 @@ gst_rtsp_connection_do_tunnel (GstRTSPConnection * conn,
conn->socket1 = conn2->socket0;
conn->stream1 = conn2->stream0;
conn->input_stream = conn2->input_stream;
conn->control_stream = g_io_stream_get_input_stream (conn->stream0);
/* clean up some of the state of conn2 */
g_cancellable_cancel (conn2->cancellable);
@ -2893,6 +2900,7 @@ gst_rtsp_connection_do_tunnel (GstRTSPConnection * conn,
conn2->stream0 = NULL;
conn2->input_stream = NULL;
conn2->output_stream = NULL;
conn2->control_stream = NULL;
g_cancellable_reset (conn2->cancellable);
/* We make socket0 the write socket and socket1 the read socket. */
@ -2976,6 +2984,7 @@ struct _GstRTSPWatch
GSource *readsrc;
GSource *writesrc;
GSource *controlsrc;
gboolean keep_running;
@ -3016,6 +3025,62 @@ gst_rtsp_source_check (GSource * source)
return FALSE;
}
static gboolean
gst_rtsp_source_dispatch_read_get_channel (GPollableInputStream * stream,
GstRTSPWatch * watch)
{
gssize count;
guint8 buffer[1024];
GError *error = NULL;
/* try to read in order to be able to detect errors, we read 1k in case some
* client actually decides to send data on the GET channel */
count = g_pollable_input_stream_read_nonblocking (stream, buffer, 1024, NULL,
&error);
if (count == 0) {
/* other end closed the socket */
goto eof;
}
if (count < 0) {
GST_DEBUG ("%s", error->message);
if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK) ||
g_error_matches (error, G_IO_ERROR, G_IO_ERROR_TIMED_OUT)) {
g_clear_error (&error);
goto done;
}
g_clear_error (&error);
goto read_error;
}
/* client sent data on the GET channel, ignore it */
done:
return TRUE;
/* ERRORS */
eof:
{
if (watch->funcs.closed)
watch->funcs.closed (watch, watch->user_data);
/* the read connection was closed, stop the watch now */
watch->keep_running = FALSE;
return FALSE;
}
read_error:
{
if (watch->funcs.error_full)
watch->funcs.error_full (watch, GST_RTSP_ESYS, &watch->message,
0, watch->user_data);
else if (watch->funcs.error)
watch->funcs.error (watch, GST_RTSP_ESYS, watch->user_data);
goto eof;
}
}
static gboolean
gst_rtsp_source_dispatch_read (GPollableInputStream * stream,
GstRTSPWatch * watch)
@ -3184,6 +3249,21 @@ gst_rtsp_source_dispatch_write (GPollableOutputStream * stream,
watch->writesrc = NULL;
/* we create and add the write source again when we actually have
* something to write */
/* since write source is now removed we add read source on the write
* socket instead to be able to detect when client closes get channel
* in tunneled mode */
if (watch->conn->control_stream) {
watch->controlsrc =
g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM
(watch->conn->control_stream), NULL);
g_source_set_callback (watch->controlsrc,
(GSourceFunc) gst_rtsp_source_dispatch_read_get_channel, watch,
NULL);
g_source_add_child_source ((GSource *) watch, watch->controlsrc);
} else {
watch->controlsrc = NULL;
}
}
break;
}
@ -3264,6 +3344,8 @@ gst_rtsp_source_finalize (GSource * source)
g_source_unref (watch->readsrc);
if (watch->writesrc)
g_source_unref (watch->writesrc);
if (watch->controlsrc)
g_source_unref (watch->controlsrc);
g_mutex_clear (&watch->mutex);
@ -3347,6 +3429,11 @@ gst_rtsp_watch_reset (GstRTSPWatch * watch)
g_source_unref (watch->writesrc);
watch->writesrc = NULL;
}
if (watch->controlsrc) {
g_source_remove_child_source ((GSource *) watch, watch->controlsrc);
g_source_unref (watch->controlsrc);
watch->controlsrc = NULL;
}
if (watch->conn->input_stream) {
watch->readsrc =
@ -3361,6 +3448,20 @@ gst_rtsp_watch_reset (GstRTSPWatch * watch)
/* we create and add the write source when we actually have something to
* write */
/* when write source is not added we add read source on the write socket
* instead to be able to detect when client closes get channel in tunneled
* mode */
if (watch->conn->control_stream) {
watch->controlsrc =
g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM
(watch->conn->control_stream), NULL);
g_source_set_callback (watch->controlsrc,
(GSourceFunc) gst_rtsp_source_dispatch_read_get_channel, watch, NULL);
g_source_add_child_source ((GSource *) watch, watch->controlsrc);
} else {
watch->controlsrc = NULL;
}
}
/**
@ -3530,6 +3631,14 @@ gst_rtsp_watch_write_data (GstRTSPWatch * watch, const guint8 * data,
* socket */
context = ((GSource *) watch)->context;
if (!watch->writesrc) {
/* remove the read source on the write socket, we will be able to detect
* errors while writing */
if (watch->controlsrc) {
g_source_remove_child_source ((GSource *) watch, watch->controlsrc);
g_source_unref (watch->controlsrc);
watch->controlsrc = NULL;
}
watch->writesrc =
g_pollable_output_stream_create_source (G_POLLABLE_OUTPUT_STREAM
(watch->conn->output_stream), NULL);

View file

@ -330,10 +330,18 @@ GST_START_TEST (test_rtspconnection_tunnel_setup)
gst_rtsp_connection_free (rtsp_conn2);
rtsp_conn2 = NULL;
/* check if rtspconnection can detect close of the get channel */
g_object_unref (client_get);
while (!g_main_context_iteration (NULL, TRUE));
fail_unless (tunnel_start_count == 1);
fail_unless (tunnel_complete_count == 2);
fail_unless (tunnel_lost_count == 1);
fail_unless (closed_count == 1);
fail_unless (gst_rtsp_connection_close (rtsp_conn1) == GST_RTSP_OK);
fail_unless (gst_rtsp_connection_free (rtsp_conn1) == GST_RTSP_OK);
g_object_unref (client_post);
g_object_unref (client_get);
g_object_unref (server_post);
g_object_unref (server_get);
}