server: rework maincontext handling in clients

Make a separate method to attach a client to a MainContext.

Let the server decide in what GMainContext the client will operate and give this
context to the client in attach. Then the server can later decide to use a
separate thread for each client or just use the mainthread.
This commit is contained in:
Wim Taymans 2012-11-12 14:09:09 +01:00
parent 5b4340067a
commit a58d404e1f
3 changed files with 123 additions and 42 deletions

View file

@ -214,7 +214,7 @@ gst_rtsp_client_finalize (GObject * obj)
GST_INFO ("finalize client %p", client);
if (client->watchid)
if (client->watch)
g_source_destroy ((GSource *) client->watch);
client_cleanup_sessions (client);
@ -2081,20 +2081,17 @@ static void
client_watch_notify (GstRTSPClient * client)
{
GST_INFO ("client %p: watch destroyed", client);
client->watchid = 0;
client->watch = NULL;
g_signal_emit (client, gst_rtsp_client_signals[SIGNAL_CLOSED], 0, NULL);
g_object_unref (client);
}
static gboolean
attach_client (GstRTSPClient * client, GSocket * socket,
setup_client (GstRTSPClient * client, GSocket * socket,
GstRTSPConnection * conn, GError ** error)
{
GSocket *read_socket;
GSocketAddress *address;
GSource *source;
GMainContext *context;
GstRTSPUrl *url;
read_socket = gst_rtsp_connection_get_read_socket (conn);
@ -2124,21 +2121,6 @@ attach_client (GstRTSPClient * client, GSocket * socket,
client->connection = conn;
/* create watch for the connection and attach */
client->watch = gst_rtsp_watch_new (client->connection, &watch_funcs,
g_object_ref (client), (GDestroyNotify) client_watch_notify);
/* find the context to add the watch */
if ((source = g_main_current_source ()))
context = g_source_get_context (source);
else
context = NULL;
GST_INFO ("attaching to context %p", context);
client->watchid = gst_rtsp_watch_attach (client->watch, context);
gst_rtsp_watch_unref (client->watch);
return TRUE;
/* ERRORS */
@ -2155,7 +2137,8 @@ no_address:
* @socket: a #GSocket
* @ip: the IP address of the remote client
* @port: the port used by the other end
* @initial_buffer: any initial data that was already read from the socket
* @initial_buffer: any zero terminated initial data that was already read from
* the socket
* @error: a #GError
*
* Take an existing network socket and use it for an RTSP connection.
@ -2163,7 +2146,7 @@ no_address:
* Returns: %TRUE on success.
*/
gboolean
gst_rtsp_client_create_from_socket (GstRTSPClient * client, GSocket * socket,
gst_rtsp_client_use_socket (GstRTSPClient * client, GSocket * socket,
const gchar * ip, gint port, const gchar * initial_buffer, GError ** error)
{
GstRTSPConnection *conn;
@ -2172,7 +2155,7 @@ gst_rtsp_client_create_from_socket (GstRTSPClient * client, GSocket * socket,
GST_RTSP_CHECK (gst_rtsp_connection_create_from_socket (socket, ip, port,
initial_buffer, &conn), no_connection);
return attach_client (client, socket, conn, error);
return setup_client (client, socket, conn, error);
/* ERRORS */
no_connection:
@ -2189,14 +2172,12 @@ no_connection:
* gst_rtsp_client_accept:
* @client: a #GstRTSPClient
* @socket: a #GSocket
* @context: the context to run in
* @cancellable: a #GCancellable
* @error: a #GError
*
* Accept a new connection for @client on @socket.
*
* This function should be called when the client properties and urls are fully
* configured and the client is ready to start.
*
* Returns: %TRUE if the client could be accepted.
*/
gboolean
@ -2210,7 +2191,7 @@ gst_rtsp_client_accept (GstRTSPClient * client, GSocket * socket,
GST_RTSP_CHECK (gst_rtsp_connection_accept (socket, &conn, cancellable),
accept_failed);
return attach_client (client, socket, conn, error);
return setup_client (client, socket, conn, error);
/* ERRORS */
accept_failed:
@ -2222,3 +2203,36 @@ accept_failed:
return FALSE;
}
}
/**
* gst_rtsp_client_attach:
* @client: a #GstRTSPClient
* @context: (allow-none): a #GMainContext
*
* Attaches @client to @context. When the mainloop for @context is run, the
* client will be dispatched. When @context is NULL, the default context will be
* used).
*
* This function should be called when the client properties and urls are fully
* configured and the client is ready to start.
*
* Returns: the ID (greater than 0) for the source within the GMainContext.
*/
guint
gst_rtsp_client_attach (GstRTSPClient * client, GMainContext * context)
{
guint res;
g_return_val_if_fail (GST_IS_RTSP_CLIENT (client), 0);
g_return_val_if_fail (client->watch == NULL, 0);
/* create watch for the connection and attach */
client->watch = gst_rtsp_watch_new (client->connection, &watch_funcs,
g_object_ref (client), (GDestroyNotify) client_watch_notify);
GST_INFO ("attaching to context %p", context);
res = gst_rtsp_watch_attach (client->watch, context);
gst_rtsp_watch_unref (client->watch);
return res;
}

View file

@ -77,7 +77,6 @@ struct _GstRTSPClientState {
*
* @connection: the connection object handling the client request.
* @watch: watch for the connection
* @watchid: id of the watch
* @ip: ip address used by the client to connect to us
* @use_client_settings: whether to allow client transport settings for multicast
* @session_pool: handle to the session pool used by the client.
@ -94,7 +93,6 @@ struct _GstRTSPClient {
GstRTSPConnection *connection;
GstRTSPWatch *watch;
guint watchid;
gchar *server_ip;
gboolean is_ipv6;
gboolean use_client_settings;
@ -164,6 +162,10 @@ gboolean gst_rtsp_client_create_from_socket(GstRTSPClient * client,
const gchar *initial_buffer,
GError **error);
guint gst_rtsp_client_attach (GstRTSPClient *client,
GMainContext *context);
G_END_DECLS
#endif /* __GST_RTSP_CLIENT_H__ */

View file

@ -749,34 +749,99 @@ close_error:
}
}
static void
unmanage_client (GstRTSPClient * client, GstRTSPServer * server)
typedef struct
{
GstRTSPServer *server;
GMainLoop *loop;
GMainContext *context;
GstRTSPClient *client;
} ClientContext;
static void
free_client_context (ClientContext * ctx)
{
g_main_context_unref (ctx->context);
if (ctx->loop)
g_main_loop_unref (ctx->loop);
g_object_unref (ctx->client);
g_slice_free (ClientContext, ctx);
}
static gpointer
do_loop (ClientContext * ctx)
{
GST_INFO ("enter mainloop");
g_main_loop_run (ctx->loop);
GST_INFO ("exit mainloop");
free_client_context (ctx);
return NULL;
}
static void
unmanage_client (GstRTSPClient * client, ClientContext * ctx)
{
GstRTSPServer *server = ctx->server;
GST_DEBUG_OBJECT (server, "unmanage client %p", client);
g_object_ref (server);
gst_rtsp_client_set_server (client, NULL);
GST_RTSP_SERVER_LOCK (server);
server->clients = g_list_remove (server->clients, client);
server->clients = g_list_remove (server->clients, ctx);
GST_RTSP_SERVER_UNLOCK (server);
g_object_unref (server);
g_object_unref (client);
if (ctx->loop)
g_main_loop_quit (ctx->loop);
else
free_client_context (ctx);
g_object_unref (server);
}
/* add the client to the active list of clients, takes ownership of
* the client */
/* add the client context to the active list of clients, takes ownership
* of client */
static void
manage_client (GstRTSPServer * server, GstRTSPClient * client)
{
ClientContext *ctx;
GST_DEBUG_OBJECT (server, "manage client %p", client);
gst_rtsp_client_set_server (client, server);
ctx = g_slice_new0 (ClientContext);
ctx->server = server;
ctx->client = client;
#if 1
{
GSource *source;
/* find the context to add the watch */
if ((source = g_main_current_source ()))
ctx->context = g_main_context_ref (g_source_get_context (source));
else
ctx->context = NULL;
}
#else
ctx->context = g_main_context_new ();
ctx->loop = g_main_loop_new (ctx->context, TRUE);
ctx->dothread = TRUE;
#endif
gst_rtsp_client_attach (client, ctx->context);
GST_RTSP_SERVER_LOCK (server);
g_signal_connect (client, "closed", (GCallback) unmanage_client, server);
server->clients = g_list_prepend (server->clients, client);
g_signal_connect (client, "closed", (GCallback) unmanage_client, ctx);
server->clients = g_list_prepend (server->clients, ctx);
GST_RTSP_SERVER_UNLOCK (server);
if (ctx->loop) {
GThread *thread;
thread = g_thread_new ("MainLoop Thread", (GThreadFunc) do_loop, ctx);
g_thread_unref (thread);
}
}
static GstRTSPClient *
@ -853,8 +918,8 @@ gst_rtsp_server_transfer_connection (GstRTSPServer * server, GSocket * socket,
goto client_failed;
/* a new client connected, create a client object to handle the client. */
if (!gst_rtsp_client_create_from_socket (client, socket, ip, port,
initial_buffer, &error)) {
if (!gst_rtsp_client_use_socket (client, socket, ip,
port, initial_buffer, &error)) {
goto transfer_failed;
}
@ -876,7 +941,7 @@ transfer_failed:
{
GST_ERROR_OBJECT (server, "failed to accept client: %s", error->message);
g_error_free (error);
gst_object_unref (client);
g_object_unref (client);
return FALSE;
}
}
@ -935,7 +1000,7 @@ accept_failed:
{
GST_ERROR_OBJECT (server, "failed to accept client: %s", error->message);
g_error_free (error);
gst_object_unref (client);
g_object_unref (client);
return FALSE;
}
}