rtsp: use child sources instead of using the sockets

Use the source of the pollable input/output streams instead of
accessing the sockets directly.
This commit is contained in:
Wim Taymans 2013-05-29 17:44:30 +02:00
parent 4ada677095
commit d09028b4c3

View file

@ -2730,8 +2730,8 @@ struct _GstRTSPWatch
GstRTSPBuilder builder; GstRTSPBuilder builder;
GstRTSPMessage message; GstRTSPMessage message;
GPollFD readfd; GSource *readsrc;
GPollFD writefd; GSource *writesrc;
/* queued message for transmission */ /* queued message for transmission */
guint id; guint id;
@ -2767,172 +2767,95 @@ gst_rtsp_source_prepare (GSource * source, gint * timeout)
static gboolean static gboolean
gst_rtsp_source_check (GSource * source) gst_rtsp_source_check (GSource * source)
{ {
GstRTSPWatch *watch = (GstRTSPWatch *) source;
if (watch->readfd.revents & READ_COND)
return TRUE;
if (watch->writefd.revents & WRITE_COND)
return TRUE;
return FALSE; return FALSE;
} }
static gboolean static gboolean
gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback G_GNUC_UNUSED, gst_rtsp_source_dispatch_read (GPollableInputStream * stream,
gpointer user_data G_GNUC_UNUSED) GstRTSPWatch * watch)
{ {
GstRTSPWatch *watch = (GstRTSPWatch *) source;
GstRTSPResult res = GST_RTSP_ERROR; GstRTSPResult res = GST_RTSP_ERROR;
gboolean keep_running = TRUE;
/* first read as much as we can */ res = build_next (&watch->builder, &watch->message, watch->conn, FALSE);
if (watch->readfd.revents & READ_COND || watch->conn->initial_buffer != NULL) { if (res == GST_RTSP_EINTR)
do { goto done;
if (watch->readfd.revents & READ_ERR) else if (G_UNLIKELY (res == GST_RTSP_EEOF)) {
goto read_error; /* When we are in tunnelled mode, the read socket can be closed and we
* should be prepared for a new POST method to reopen it */
if (watch->conn->tstate == TUNNEL_STATE_COMPLETE) {
/* remove the read connection for the tunnel */
/* we accept a new POST request */
watch->conn->tstate = TUNNEL_STATE_GET;
/* and signal that we lost our tunnel */
if (watch->funcs.tunnel_lost)
res = watch->funcs.tunnel_lost (watch, watch->user_data);
goto read_done;
} else
goto eof;
} else if (G_LIKELY (res == GST_RTSP_OK)) {
if (!watch->conn->manual_http &&
watch->message.type == GST_RTSP_MESSAGE_HTTP_REQUEST) {
if (watch->conn->tstate == TUNNEL_STATE_NONE &&
watch->message.type_data.request.method == GST_RTSP_GET) {
GstRTSPMessage *response;
GstRTSPStatusCode code;
res = build_next (&watch->builder, &watch->message, watch->conn, FALSE); watch->conn->tstate = TUNNEL_STATE_GET;
if (res == GST_RTSP_EINTR)
break;
else if (G_UNLIKELY (res == GST_RTSP_EEOF)) {
watch->readfd.events = 0;
watch->readfd.revents = 0;
g_source_remove_poll ((GSource *) watch, &watch->readfd);
/* When we are in tunnelled mode, the read socket can be closed and we
* should be prepared for a new POST method to reopen it */
if (watch->conn->tstate == TUNNEL_STATE_COMPLETE) {
/* remove the read connection for the tunnel */
/* we accept a new POST request */
watch->conn->tstate = TUNNEL_STATE_GET;
/* and signal that we lost our tunnel */
if (watch->funcs.tunnel_lost)
res = watch->funcs.tunnel_lost (watch, watch->user_data);
goto read_done;
} else
goto eof;
} else if (G_LIKELY (res == GST_RTSP_OK)) {
if (!watch->conn->manual_http &&
watch->message.type == GST_RTSP_MESSAGE_HTTP_REQUEST) {
if (watch->conn->tstate == TUNNEL_STATE_NONE &&
watch->message.type_data.request.method == GST_RTSP_GET) {
GstRTSPMessage *response;
GstRTSPStatusCode code;
watch->conn->tstate = TUNNEL_STATE_GET; if (watch->funcs.tunnel_start)
code = watch->funcs.tunnel_start (watch, watch->user_data);
else
code = GST_RTSP_STS_OK;
if (watch->funcs.tunnel_start) /* queue the response */
code = watch->funcs.tunnel_start (watch, watch->user_data); response = gen_tunnel_reply (watch->conn, code, &watch->message);
else gst_rtsp_watch_send_message (watch, response, NULL);
code = GST_RTSP_STS_OK; gst_rtsp_message_free (response);
goto read_done;
} else if (watch->conn->tstate == TUNNEL_STATE_NONE &&
watch->message.type_data.request.method == GST_RTSP_POST) {
watch->conn->tstate = TUNNEL_STATE_POST;
/* queue the response */ /* in the callback the connection should be tunneled with the
response = gen_tunnel_reply (watch->conn, code, &watch->message); * GET connection */
gst_rtsp_watch_send_message (watch, response, NULL); if (watch->funcs.tunnel_complete) {
gst_rtsp_message_free (response); watch->funcs.tunnel_complete (watch, watch->user_data);
goto read_done;
} else if (watch->conn->tstate == TUNNEL_STATE_NONE &&
watch->message.type_data.request.method == GST_RTSP_POST) {
watch->conn->tstate = TUNNEL_STATE_POST;
/* in the callback the connection should be tunneled with the
* GET connection */
if (watch->funcs.tunnel_complete) {
watch->funcs.tunnel_complete (watch, watch->user_data);
keep_running = !(watch->conn->read_socket == NULL &&
watch->conn->write_socket == NULL);
if (!keep_running)
goto done;
}
goto read_done;
}
} }
goto read_done;
} }
}
} else
goto read_error;
if (!watch->conn->manual_http) { if (!watch->conn->manual_http) {
/* if manual HTTP support is not enabled, then restore the message to /* if manual HTTP support is not enabled, then restore the message to
* what it would have looked like without the support for parsing HTTP * what it would have looked like without the support for parsing HTTP
* messages being present */ * messages being present */
if (watch->message.type == GST_RTSP_MESSAGE_HTTP_REQUEST) { if (watch->message.type == GST_RTSP_MESSAGE_HTTP_REQUEST) {
watch->message.type = GST_RTSP_MESSAGE_REQUEST; watch->message.type = GST_RTSP_MESSAGE_REQUEST;
watch->message.type_data.request.method = GST_RTSP_INVALID; watch->message.type_data.request.method = GST_RTSP_INVALID;
if (watch->message.type_data.request.version != GST_RTSP_VERSION_1_0) if (watch->message.type_data.request.version != GST_RTSP_VERSION_1_0)
watch->message.type_data.request.version = GST_RTSP_VERSION_INVALID; watch->message.type_data.request.version = GST_RTSP_VERSION_INVALID;
res = GST_RTSP_EPARSE; res = GST_RTSP_EPARSE;
} else if (watch->message.type == GST_RTSP_MESSAGE_HTTP_RESPONSE) { } else if (watch->message.type == GST_RTSP_MESSAGE_HTTP_RESPONSE) {
watch->message.type = GST_RTSP_MESSAGE_RESPONSE; watch->message.type = GST_RTSP_MESSAGE_RESPONSE;
if (watch->message.type_data.response.version != GST_RTSP_VERSION_1_0) if (watch->message.type_data.response.version != GST_RTSP_VERSION_1_0)
watch->message.type_data.response.version = watch->message.type_data.response.version = GST_RTSP_VERSION_INVALID;
GST_RTSP_VERSION_INVALID; res = GST_RTSP_EPARSE;
res = GST_RTSP_EPARSE; }
}
}
if (G_LIKELY (res == GST_RTSP_OK)) {
if (watch->funcs.message_received)
watch->funcs.message_received (watch, &watch->message,
watch->user_data);
} else {
goto read_error;
}
read_done:
gst_rtsp_message_unset (&watch->message);
build_reset (&watch->builder);
} while (FALSE);
} }
if (G_LIKELY (res != GST_RTSP_OK))
goto read_error;
if (watch->writefd.revents & WRITE_COND) { if (watch->funcs.message_received)
if (watch->writefd.revents & WRITE_ERR) watch->funcs.message_received (watch, &watch->message, watch->user_data);
goto write_error;
g_mutex_lock (&watch->mutex); read_done:
do { gst_rtsp_message_unset (&watch->message);
if (watch->write_data == NULL) { build_reset (&watch->builder);
GstRTSPRec *rec;
/* get a new message from the queue */
rec = g_queue_pop_tail (watch->messages);
if (rec == NULL)
break;
watch->messages_bytes -= rec->size;
watch->write_off = 0;
watch->write_data = rec->data;
watch->write_size = rec->size;
watch->write_id = rec->id;
g_slice_free (GstRTSPRec, rec);
}
res = write_bytes (watch->conn->output_stream, watch->write_data,
&watch->write_off, watch->write_size, FALSE,
watch->conn->cancellable);
g_mutex_unlock (&watch->mutex);
if (res == GST_RTSP_EINTR)
goto write_blocked;
else if (G_LIKELY (res == GST_RTSP_OK)) {
if (watch->funcs.message_sent)
watch->funcs.message_sent (watch, watch->write_id, watch->user_data);
} else {
goto write_error;
}
g_mutex_lock (&watch->mutex);
g_free (watch->write_data);
watch->write_data = NULL;
} while (TRUE);
watch->writefd.events = WRITE_ERR;
g_mutex_unlock (&watch->mutex);
}
done: done:
write_blocked: return TRUE;
return keep_running;
/* ERRORS */ /* ERRORS */
eof: eof:
@ -2945,42 +2868,87 @@ eof:
} }
read_error: read_error:
{ {
watch->readfd.events = 0; if (watch->funcs.error_full)
watch->readfd.revents = 0; GST_RTSP_CHECK (watch->funcs.error_full (watch, res, &watch->message,
g_source_remove_poll ((GSource *) watch, &watch->readfd); 0, watch->user_data), error);
keep_running = (watch->writefd.events != 0);
if (keep_running) {
if (watch->funcs.error_full)
GST_RTSP_CHECK (watch->funcs.error_full (watch, res, &watch->message,
0, watch->user_data), error);
else
goto error;
} else
goto eof;
}
write_error:
{
watch->writefd.events = 0;
watch->writefd.revents = 0;
g_source_remove_poll ((GSource *) watch, &watch->writefd);
keep_running = (watch->readfd.events != 0);
if (keep_running) {
if (watch->funcs.error_full)
GST_RTSP_CHECK (watch->funcs.error_full (watch, res, NULL,
watch->write_id, watch->user_data), error);
else
goto error;
} else
goto eof;
} }
error: error:
{ {
if (watch->funcs.error) if (watch->funcs.error)
watch->funcs.error (watch, res, watch->user_data); watch->funcs.error (watch, res, watch->user_data);
return keep_running; return FALSE;
}
}
static gboolean
gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback G_GNUC_UNUSED,
gpointer user_data G_GNUC_UNUSED)
{
return TRUE;
}
static gboolean
gst_rtsp_source_dispatch_write (GPollableInputStream * stream,
GstRTSPWatch * watch)
{
GstRTSPResult res = GST_RTSP_ERROR;
g_mutex_lock (&watch->mutex);
do {
if (watch->write_data == NULL) {
GstRTSPRec *rec;
/* get a new message from the queue */
rec = g_queue_pop_tail (watch->messages);
if (rec == NULL)
break;
watch->messages_bytes -= rec->size;
watch->write_off = 0;
watch->write_data = rec->data;
watch->write_size = rec->size;
watch->write_id = rec->id;
g_slice_free (GstRTSPRec, rec);
}
res = write_bytes (watch->conn->output_stream, watch->write_data,
&watch->write_off, watch->write_size, FALSE, watch->conn->cancellable);
g_mutex_unlock (&watch->mutex);
if (res == GST_RTSP_EINTR)
goto write_blocked;
else if (G_LIKELY (res == GST_RTSP_OK)) {
if (watch->funcs.message_sent)
watch->funcs.message_sent (watch, watch->write_id, watch->user_data);
} else {
goto write_error;
}
g_mutex_lock (&watch->mutex);
g_free (watch->write_data);
watch->write_data = NULL;
} while (TRUE);
g_mutex_unlock (&watch->mutex);
write_blocked:
return TRUE;
/* ERRORS */
write_error:
{
if (watch->funcs.error_full)
GST_RTSP_CHECK (watch->funcs.error_full (watch, res, NULL,
watch->write_id, watch->user_data), error);
}
error:
{
if (watch->funcs.error)
watch->funcs.error (watch, res, watch->user_data);
return FALSE;
} }
} }
@ -3060,9 +3028,6 @@ gst_rtsp_watch_new (GstRTSPConnection * conn,
g_mutex_init (&result->mutex); g_mutex_init (&result->mutex);
result->messages = g_queue_new (); result->messages = g_queue_new ();
result->readfd.fd = -1;
result->writefd.fd = -1;
gst_rtsp_watch_reset (result); gst_rtsp_watch_reset (result);
result->funcs = *funcs; result->funcs = *funcs;
@ -3082,23 +3047,30 @@ gst_rtsp_watch_new (GstRTSPConnection * conn,
void void
gst_rtsp_watch_reset (GstRTSPWatch * watch) gst_rtsp_watch_reset (GstRTSPWatch * watch)
{ {
if (watch->readfd.fd != -1) if (watch->readsrc)
g_source_remove_poll ((GSource *) watch, &watch->readfd); g_source_remove_child_source ((GSource *) watch, watch->readsrc);
if (watch->writefd.fd != -1) if (watch->writesrc)
g_source_remove_poll ((GSource *) watch, &watch->writefd); g_source_remove_child_source ((GSource *) watch, watch->writesrc);
watch->readfd.fd = g_socket_get_fd (watch->conn->read_socket); watch->readsrc =
watch->readfd.events = READ_COND; g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM
watch->readfd.revents = 0; (watch->conn->input_stream), NULL);
g_source_set_callback (watch->readsrc,
(GSourceFunc) gst_rtsp_source_dispatch_read, watch, NULL);
watch->writesrc =
g_pollable_output_stream_create_source (G_POLLABLE_OUTPUT_STREAM
(watch->conn->output_stream), NULL);
g_source_set_callback (watch->writesrc,
(GSourceFunc) gst_rtsp_source_dispatch_write, watch, NULL);
watch->writefd.fd = g_socket_get_fd (watch->conn->write_socket); if (watch->readsrc) {
watch->writefd.events = WRITE_ERR; g_source_add_child_source ((GSource *) watch, watch->readsrc);
watch->writefd.revents = 0; g_source_unref (watch->readsrc);
}
if (watch->readfd.fd != -1) if (watch->writesrc) {
g_source_add_poll ((GSource *) watch, &watch->readfd); g_source_add_child_source ((GSource *) watch, watch->writesrc);
if (watch->writefd.fd != -1) g_source_unref (watch->writesrc);
g_source_add_poll ((GSource *) watch, &watch->writefd); }
} }
/** /**
@ -3266,10 +3238,7 @@ gst_rtsp_watch_write_data (GstRTSPWatch * watch, const guint8 * data,
/* make sure the main context will now also check for writability on the /* make sure the main context will now also check for writability on the
* socket */ * socket */
if (watch->writefd.events != WRITE_COND) { context = ((GSource *) watch)->context;
watch->writefd.events = WRITE_COND;
context = ((GSource *) watch)->context;
}
if (id != NULL) if (id != NULL)
*id = rec->id; *id = rec->id;