Use ASYNC RTSP io

Use the async RTSP channels instead of spawning a new thread for each client.
If a sessionid is specified in a request, fail if we don't have the session.
This commit is contained in:
Wim Taymans 2009-02-18 18:57:31 +01:00
parent b70a6c9d83
commit 39c2e31e65
2 changed files with 170 additions and 201 deletions

View file

@ -184,7 +184,10 @@ send_response (GstRTSPClient *client, GstRTSPSession *session, GstRTSPMessage *r
gst_rtsp_message_remove_header (response, GST_RTSP_HDR_SESSION, -1);
}
#if 0
gst_rtsp_connection_send (client->connection, response, &timeout);
#endif
gst_rtsp_channel_queue_message (client->channel, response);
gst_rtsp_message_unset (response);
}
@ -287,45 +290,6 @@ no_prepare:
}
}
/* Get the session or NULL when there was no session */
static GstRTSPSession *
find_session (GstRTSPClient *client, GstRTSPMessage *request)
{
GstRTSPResult res;
GstRTSPSession *session;
gchar *sessid;
res = gst_rtsp_message_get_header (request, GST_RTSP_HDR_SESSION, &sessid, 0);
if (res == GST_RTSP_OK) {
if (client->session_pool == NULL)
goto no_pool;
/* we had a session in the request, find it again */
if (!(session = gst_rtsp_session_pool_find (client->session_pool, sessid)))
goto session_not_found;
client->timeout = gst_rtsp_session_get_timeout (session);
}
else
goto service_unavailable;
return session;
/* ERRORS */
no_pool:
{
return NULL;
}
session_not_found:
{
return NULL;
}
service_unavailable:
{
return NULL;
}
}
static gboolean
handle_teardown_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession *session, GstRTSPMessage *request)
{
@ -490,7 +454,7 @@ handle_play_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession *ses
/* ERRORS */
no_session:
{
/* error was sent */
send_generic_response (client, GST_RTSP_STS_SESSION_NOT_FOUND, request);
return FALSE;
}
not_found:
@ -580,7 +544,7 @@ handle_setup_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession *se
/* we have a valid transport now, set the destination of the client. */
g_free (ct->destination);
ct->destination = g_strdup (inet_ntoa (client->address.sin_addr));
ct->destination = g_strdup (client->connection->url->host);
if (session) {
g_object_ref (session);
@ -813,163 +777,101 @@ santize_uri (GstRTSPUrl *uri)
*d = '\0';
}
/* this function runs in a client specific thread and handles all rtsp messages
* with the client */
static gpointer
handle_client (GstRTSPClient *client)
static void
handle_request (GstRTSPClient *client, GstRTSPMessage *request)
{
GstRTSPMessage request = { 0 };
GstRTSPResult res;
GstRTSPMethod method;
const gchar *uristr;
GstRTSPUrl *uri;
GstRTSPVersion version;
while (TRUE) {
GTimeVal timeout;
GstRTSPSession *session;
timeout.tv_sec = client->timeout;
timeout.tv_usec = 0;
/* start by waiting for a message from the client */
res = gst_rtsp_connection_receive (client->connection, &request, &timeout);
if (res < 0) {
if (res == GST_RTSP_ETIMEOUT)
goto timeout;
goto receive_failed;
}
GstRTSPResult res;
GstRTSPSession *session;
gchar *sessid;
#ifdef DEBUG
gst_rtsp_message_dump (&request);
gst_rtsp_message_dump (request);
#endif
gst_rtsp_message_parse_request (&request, &method, &uristr, &version);
gst_rtsp_message_parse_request (request, &method, &uristr, &version);
if (version != GST_RTSP_VERSION_1_0) {
/* we can only handle 1.0 requests */
send_generic_response (client, GST_RTSP_STS_RTSP_VERSION_NOT_SUPPORTED, &request);
continue;
}
/* we always try to parse the url first */
if ((res = gst_rtsp_url_parse (uristr, &uri)) != GST_RTSP_OK) {
send_generic_response (client, GST_RTSP_STS_BAD_REQUEST, &request);
continue;
}
/* sanitize the uri */
santize_uri (uri);
/* get the session if there is any */
session = find_session (client, &request);
/* now see what is asked and dispatch to a dedicated handler */
switch (method) {
case GST_RTSP_OPTIONS:
handle_options_request (client, uri, session, &request);
break;
case GST_RTSP_DESCRIBE:
handle_describe_request (client, uri, session, &request);
break;
case GST_RTSP_SETUP:
handle_setup_request (client, uri, session, &request);
break;
case GST_RTSP_PLAY:
handle_play_request (client, uri, session, &request);
break;
case GST_RTSP_PAUSE:
handle_pause_request (client, uri, session, &request);
break;
case GST_RTSP_TEARDOWN:
handle_teardown_request (client, uri, session, &request);
break;
case GST_RTSP_ANNOUNCE:
case GST_RTSP_GET_PARAMETER:
case GST_RTSP_RECORD:
case GST_RTSP_REDIRECT:
case GST_RTSP_SET_PARAMETER:
send_generic_response (client, GST_RTSP_STS_NOT_IMPLEMENTED, &request);
break;
case GST_RTSP_INVALID:
default:
send_generic_response (client, GST_RTSP_STS_BAD_REQUEST, &request);
break;
}
if (session)
g_object_unref (session);
gst_rtsp_url_free (uri);
if (version != GST_RTSP_VERSION_1_0) {
/* we can only handle 1.0 requests */
send_generic_response (client, GST_RTSP_STS_RTSP_VERSION_NOT_SUPPORTED, request);
return;
}
g_object_unref (client);
return NULL;
/* we always try to parse the url first */
if ((res = gst_rtsp_url_parse (uristr, &uri)) != GST_RTSP_OK) {
send_generic_response (client, GST_RTSP_STS_BAD_REQUEST, request);
return;
}
/* sanitize the uri */
santize_uri (uri);
/* get the session if there is any */
res = gst_rtsp_message_get_header (request, GST_RTSP_HDR_SESSION, &sessid, 0);
if (res == GST_RTSP_OK) {
if (client->session_pool == NULL)
goto no_pool;
/* we had a session in the request, find it again */
if (!(session = gst_rtsp_session_pool_find (client->session_pool, sessid)))
goto session_not_found;
client->timeout = gst_rtsp_session_get_timeout (session);
}
else
session = NULL;
/* now see what is asked and dispatch to a dedicated handler */
switch (method) {
case GST_RTSP_OPTIONS:
handle_options_request (client, uri, session, request);
break;
case GST_RTSP_DESCRIBE:
handle_describe_request (client, uri, session, request);
break;
case GST_RTSP_SETUP:
handle_setup_request (client, uri, session, request);
break;
case GST_RTSP_PLAY:
handle_play_request (client, uri, session, request);
break;
case GST_RTSP_PAUSE:
handle_pause_request (client, uri, session, request);
break;
case GST_RTSP_TEARDOWN:
handle_teardown_request (client, uri, session, request);
break;
case GST_RTSP_ANNOUNCE:
case GST_RTSP_GET_PARAMETER:
case GST_RTSP_RECORD:
case GST_RTSP_REDIRECT:
case GST_RTSP_SET_PARAMETER:
send_generic_response (client, GST_RTSP_STS_NOT_IMPLEMENTED, request);
break;
case GST_RTSP_INVALID:
default:
send_generic_response (client, GST_RTSP_STS_BAD_REQUEST, request);
break;
}
if (session)
g_object_unref (session);
gst_rtsp_url_free (uri);
return;
/* ERRORS */
timeout:
no_pool:
{
g_message ("client timed out");
if (client->session_pool)
gst_rtsp_session_pool_cleanup (client->session_pool);
goto cleanup;
send_generic_response (client, GST_RTSP_STS_SERVICE_UNAVAILABLE, request);
return;
}
receive_failed:
session_not_found:
{
gchar *str;
str = gst_rtsp_strresult (res);
g_message ("receive failed %d (%s), disconnect client %p", res,
str, client);
g_free (str);
goto cleanup;
}
cleanup:
{
gst_rtsp_message_unset (&request);
gst_rtsp_connection_close (client->connection);
g_object_unref (client);
return NULL;
}
}
/* called when we need to accept a new request from a client */
static gboolean
client_accept (GstRTSPClient *client, GIOChannel *channel)
{
/* a new client connected. */
int server_sock_fd, fd;
unsigned int address_len;
GstRTSPConnection *conn;
server_sock_fd = g_io_channel_unix_get_fd (channel);
address_len = sizeof (client->address);
memset (&client->address, 0, address_len);
fd = accept (server_sock_fd, (struct sockaddr *) &client->address,
&address_len);
if (fd == -1)
goto accept_failed;
/* now create the connection object */
gst_rtsp_connection_create (NULL, &conn);
conn->fd.fd = fd;
/* FIXME some hackery, we need to have a connection method to accept server
* connections */
gst_poll_add_fd (conn->fdset, &conn->fd);
g_message ("added new client %p ip %s with fd %d", client,
inet_ntoa (client->address.sin_addr), conn->fd.fd);
client->connection = conn;
return TRUE;
/* ERRORS */
accept_failed:
{
g_error ("Could not accept client on server socket %d: %s (%d)",
server_sock_fd, g_strerror (errno), errno);
return FALSE;
send_generic_response (client, GST_RTSP_STS_SESSION_NOT_FOUND, request);
return;
}
}
@ -1087,6 +989,58 @@ gst_rtsp_client_get_media_mapping (GstRTSPClient *client)
return result;
}
static GstRTSPResult
message_received (GstRTSPChannel *channel, GstRTSPMessage *message, gpointer user_data)
{
GstRTSPClient *client = GST_RTSP_CLIENT (user_data);
g_message ("client %p: received a message", client);
handle_request (client, message);
return GST_RTSP_OK;
}
static GstRTSPResult
message_sent (GstRTSPChannel *channel, guint cseq, gpointer user_data)
{
GstRTSPClient *client = GST_RTSP_CLIENT (user_data);
g_message ("client %p: sent a message with cseq %d", client, cseq);
return GST_RTSP_OK;
}
static GstRTSPResult
closed (GstRTSPChannel *channel, gpointer user_data)
{
GstRTSPClient *client = GST_RTSP_CLIENT (user_data);
g_message ("client %p: connection closed", client);
return GST_RTSP_OK;
}
static GstRTSPResult
error (GstRTSPChannel *channel, GstRTSPResult result, gpointer user_data)
{
GstRTSPClient *client = GST_RTSP_CLIENT (user_data);
gchar *str;
str = gst_rtsp_strresult (result);
g_message ("client %p: received an error %s", client, str);
g_free (str);
return GST_RTSP_OK;
}
static GstRTSPChannelFuncs channel_funcs = {
message_received,
message_sent,
closed,
error
};
/**
* gst_rtsp_client_attach:
* @client: a #GstRTSPClient
@ -1102,32 +1056,47 @@ gst_rtsp_client_get_media_mapping (GstRTSPClient *client)
gboolean
gst_rtsp_client_accept (GstRTSPClient *client, GIOChannel *channel)
{
GError *error = NULL;
int sock;
GstRTSPConnection *conn;
GstRTSPResult res;
GSource *source;
GMainContext *context;
if (!client_accept (client, channel))
goto accept_failed;
/* a new client connected. */
sock = g_io_channel_unix_get_fd (channel);
/* client accepted, spawn a thread for the client, we don't need to join the
* thread */
g_object_ref (client);
client->thread = g_thread_create ((GThreadFunc)handle_client, client, FALSE, &error);
if (client->thread == NULL)
goto no_thread;
GST_RTSP_CHECK (gst_rtsp_connection_accept (sock, &conn), accept_failed);
g_message ("added new client %p ip %s:%d with fd %d", client,
conn->url->host, conn->url->port, conn->fd.fd);
client->connection = conn;
/* create channel for the connection and attach */
client->channel = gst_rtsp_channel_new (client->connection, &channel_funcs,
g_object_ref (client), g_object_unref);
/* find the context to add the channel */
if ((source = g_main_current_source ()))
context = g_source_get_context (source);
else
context = NULL;
g_message ("attaching to context %p", context);
gst_rtsp_channel_attach (client->channel, context);
gst_rtsp_channel_unref (client->channel);
return TRUE;
/* ERRORS */
accept_failed:
{
return FALSE;
}
no_thread:
{
if (error) {
g_warning ("could not create thread for client %p: %s", client, error->message);
g_error_free (error);
}
g_object_unref (client);
gchar *str = gst_rtsp_strresult (res);
g_error ("Could not accept client on server socket %d: %s",
sock, str);
g_free (str);
return FALSE;
}
}

View file

@ -70,7 +70,7 @@ struct _GstRTSPClient {
GObject parent;
GstRTSPConnection *connection;
struct sockaddr_in address;
GstRTSPChannel *channel;
GThread *thread;
guint timeout;