rtsp-server: port to GIO

Port to GIO
This commit is contained in:
Wim Taymans 2012-03-07 15:03:55 +01:00
parent f5c6572bbc
commit 4c59e211e2
7 changed files with 168 additions and 137 deletions

View file

@ -1435,7 +1435,7 @@ handle_data (GstRTSPClient * client, GstRTSPMessage * message)
buffer = gst_buffer_new ();
gst_buffer_take_memory (buffer, -1,
gst_memory_new_wrapped (0, data, g_free, size, 0, size));
gst_memory_new_wrapped (0, data, size, 0, size, data, g_free));
handled = FALSE;
for (walk = client->streams; walk; walk = g_list_next (walk)) {
@ -1890,9 +1890,11 @@ client_watch_notify (GstRTSPClient * client)
/**
* gst_rtsp_client_attach:
* @client: a #GstRTSPClient
* @channel: a #GIOChannel
* @socket: a #GSocket
* @cancellable: a #GCancellable
* @error: a #GError
*
* Accept a new connection for @client on the socket in @channel.
* Accept a new connection for @client on @socket.
*
* This function should be called when the client properties and urls are fully
* configured and the client is ready to start.
@ -1900,11 +1902,13 @@ client_watch_notify (GstRTSPClient * client)
* Returns: %TRUE if the client could be accepted.
*/
gboolean
gst_rtsp_client_accept (GstRTSPClient * client, GIOChannel * channel)
gst_rtsp_client_accept (GstRTSPClient * client, GSocket * socket,
GCancellable * cancellable, GError ** error)
{
int sock, fd;
GstRTSPConnection *conn;
GstRTSPResult res;
GSocket *read_socket;
GSocketAddress *addres;
GSource *source;
GMainContext *context;
GstRTSPUrl *url;
@ -1913,17 +1917,18 @@ gst_rtsp_client_accept (GstRTSPClient * client, GIOChannel * channel)
gchar ip[INET6_ADDRSTRLEN];
/* a new client connected. */
sock = g_io_channel_unix_get_fd (channel);
GST_RTSP_CHECK (gst_rtsp_connection_accept (socket, &conn, cancellable),
accept_failed);
GST_RTSP_CHECK (gst_rtsp_connection_accept (sock, &conn), accept_failed);
read_socket = gst_rtsp_connection_get_read_socket (conn);
client->is_ipv6 = g_socket_get_family (socket) == G_SOCKET_FAMILY_IPV6;
fd = gst_rtsp_connection_get_readfd (conn);
if (!(addres = g_socket_get_remote_address (read_socket, error)))
goto no_address;
addrlen = sizeof (addr);
if (getsockname (fd, (struct sockaddr *) &addr, &addrlen) < 0)
goto getpeername_failed;
client->is_ipv6 = addr.ss_family == AF_INET6;
if (!g_socket_address_to_native (addres, &addr, addrlen, error))
goto native_failed;
if (getnameinfo ((struct sockaddr *) &addr, addrlen, ip, sizeof (ip), NULL, 0,
NI_NUMERICHOST) != 0)
@ -1963,13 +1968,18 @@ accept_failed:
{
gchar *str = gst_rtsp_strresult (res);
GST_ERROR ("Could not accept client on server socket %d: %s", sock, str);
GST_ERROR ("Could not accept client on server socket %p: %s", socket, str);
g_free (str);
return FALSE;
}
getpeername_failed:
no_address:
{
GST_ERROR ("getpeername failed: %s", g_strerror (errno));
GST_ERROR ("could not get remote address %s", (*error)->message);
return FALSE;
}
native_failed:
{
GST_ERROR ("could not get native address %s", (*error)->message);
return FALSE;
}
getnameinfo_failed:

View file

@ -132,7 +132,9 @@ GstRTSPAuth * gst_rtsp_client_get_auth (GstRTSPClient *client);
gboolean gst_rtsp_client_accept (GstRTSPClient *client,
GIOChannel *channel);
GSocket *socket,
GCancellable *cancellable,
GError **error);
G_END_DECLS

View file

@ -166,7 +166,7 @@ gst_rtsp_media_factory_uri_init (GstRTSPMediaFactoryURI * factory)
factory->use_gstpay = DEFAULT_USE_GSTPAY;
/* get the feature list using the filter */
gst_default_registry_feature_filter ((GstPluginFeatureFilter)
gst_registry_feature_filter (gst_registry_get (), (GstPluginFeatureFilter)
payloader_filter, FALSE, &data);
/* sort */
factory->demuxers =

View file

@ -851,7 +851,8 @@ alloc_udp_ports (GstRTSPMedia * media, GstRTSPMediaStream * stream)
GstElement *udpsink0, *udpsink1;
gint tmp_rtp, tmp_rtcp;
guint count;
gint rtpport, rtcpport, sockfd;
gint rtpport, rtcpport;
GSocket *socket;
const gchar *host;
udpsrc0 = NULL;
@ -944,9 +945,9 @@ again:
if (!udpsink0)
goto no_udp_protocol;
g_object_get (G_OBJECT (udpsrc0), "sock", &sockfd, NULL);
g_object_set (G_OBJECT (udpsink0), "sockfd", sockfd, NULL);
g_object_set (G_OBJECT (udpsink0), "closefd", FALSE, NULL);
g_object_get (G_OBJECT (udpsrc0), "socket", &socket, NULL);
g_object_set (G_OBJECT (udpsink0), "socket", socket, NULL);
g_object_set (G_OBJECT (udpsink0), "close-socket", FALSE, NULL);
udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
if (!udpsink1)
@ -968,9 +969,9 @@ again:
GST_WARNING ("multiudpsink version found without buffer-size property");
}
g_object_get (G_OBJECT (udpsrc1), "sock", &sockfd, NULL);
g_object_set (G_OBJECT (udpsink1), "sockfd", sockfd, NULL);
g_object_set (G_OBJECT (udpsink1), "closefd", FALSE, NULL);
g_object_get (G_OBJECT (udpsrc1), "socket", &socket, NULL);
g_object_set (G_OBJECT (udpsink1), "socket", socket, NULL);
g_object_set (G_OBJECT (udpsink1), "close-socket", FALSE, NULL);
g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL);
g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);

View file

@ -78,7 +78,7 @@ static void gst_rtsp_server_finalize (GObject * object);
static GstRTSPClient *default_create_client (GstRTSPServer * server);
static gboolean default_accept_client (GstRTSPServer * server,
GstRTSPClient * client, GIOChannel * channel);
GstRTSPClient * client, GSocket * socket, GError ** error);
static void
gst_rtsp_server_class_init (GstRTSPServerClass * klass)
@ -546,87 +546,94 @@ gst_rtsp_server_set_property (GObject * object, guint propid,
}
/**
* gst_rtsp_server_get_io_channel:
* gst_rtsp_server_create_socket:
* @server: a #GstRTSPServer
* @cancellable: a #GCancellable
* @error: a #GError
*
* Create a #GIOChannel for @server. The io channel will listen on the
* Create a #GSocket for @server. The socket will listen on the
* configured service.
*
* Returns: the GIOChannel for @server or NULL when an error occured.
* Returns: the #GSocket for @server or NULL when an error occured.
*/
GIOChannel *
gst_rtsp_server_get_io_channel (GstRTSPServer * server)
GSocket *
gst_rtsp_server_create_socket (GstRTSPServer * server,
GCancellable * cancellable, GError ** error)
{
GIOChannel *channel;
int ret, sockfd = -1;
struct addrinfo hints;
struct addrinfo *result, *rp;
GSocketConnectable *conn;
GSocketAddressEnumerator *enumerator;
GSocket *socket;
#ifdef USE_SOLINGER
struct linger linger;
#endif
GError *sock_error = NULL;
GError *bind_error = NULL;
guint16 port;
g_return_val_if_fail (GST_IS_RTSP_SERVER (server), NULL);
memset (&hints, 0, sizeof (struct addrinfo));
hints.ai_family = AF_UNSPEC; /* Allow IPv4 or IPv6 */
hints.ai_socktype = SOCK_STREAM; /* stream socket */
hints.ai_flags = AI_PASSIVE | AI_CANONNAME; /* For wildcard IP address */
hints.ai_protocol = 0; /* Any protocol */
hints.ai_canonname = NULL;
hints.ai_addr = NULL;
hints.ai_next = NULL;
GST_RTSP_SERVER_LOCK (server);
GST_DEBUG_OBJECT (server, "getting address info of %s/%s", server->address,
server->service);
GST_RTSP_SERVER_LOCK (server);
/* resolve the server IP address */
if ((ret =
getaddrinfo (server->address, server->service, &hints, &result)) != 0)
goto no_address;
port = atoi (server->service);
if (port != 0)
conn = g_network_address_new (server->address, port);
else
conn = g_network_service_new (server->service, "tcp", server->address);
enumerator = g_socket_connectable_enumerate (conn);
g_object_unref (conn);
/* create server socket, we loop through all the addresses until we manage to
* create a socket and bind. */
for (rp = result; rp; rp = rp->ai_next) {
sockfd = socket (rp->ai_family, rp->ai_socktype, rp->ai_protocol);
if (sockfd == -1) {
while (TRUE) {
GSocketAddress *sockaddr;
sockaddr =
g_socket_address_enumerator_next (enumerator, cancellable, error);
if (!sockaddr) {
GST_DEBUG_OBJECT (server, "no more addresses %s", (*error)->message);
break;
}
/* only keep the first error */
socket = g_socket_new (g_socket_address_get_family (sockaddr),
G_SOCKET_TYPE_STREAM, G_SOCKET_PROTOCOL_TCP,
sock_error ? NULL : &sock_error);
if (socket == NULL) {
GST_DEBUG_OBJECT (server, "failed to make socket (%s), try next",
g_strerror (errno));
sock_error->message);
continue;
}
/* make address reusable */
ret = 1;
if (setsockopt (sockfd, SOL_SOCKET, SO_REUSEADDR,
(void *) &ret, sizeof (ret)) < 0) {
/* warn but try to bind anyway */
GST_WARNING_OBJECT (server, "failed to reuse socker (%s)",
g_strerror (errno));
}
if (bind (sockfd, rp->ai_addr, rp->ai_addrlen) == 0) {
GST_DEBUG_OBJECT (server, "bind on %s", rp->ai_canonname);
if (g_socket_bind (socket, sockaddr, TRUE, bind_error ? NULL : &bind_error)) {
g_object_unref (sockaddr);
break;
}
GST_DEBUG_OBJECT (server, "failed to bind socket (%s), try next",
g_strerror (errno));
close (sockfd);
sockfd = -1;
bind_error->message);
g_object_unref (sockaddr);
g_object_unref (socket);
socket = NULL;
}
freeaddrinfo (result);
g_object_unref (enumerator);
if (sockfd == -1)
if (socket == NULL)
goto no_socket;
GST_DEBUG_OBJECT (server, "opened sending server socket with fd %d", sockfd);
g_clear_error (&sock_error);
g_clear_error (&bind_error);
GST_DEBUG_OBJECT (server, "opened sending server socket");
/* keep connection alive; avoids SIGPIPE during write */
ret = 1;
if (setsockopt (sockfd, SOL_SOCKET, SO_KEEPALIVE,
(void *) &ret, sizeof (ret)) < 0)
goto keepalive_failed;
g_socket_set_keepalive (socket, TRUE);
#if 0
#ifdef USE_SOLINGER
/* make sure socket is reset 5 seconds after close. This ensure that we can
* reuse the socket quickly while still having a chance to send data to the
@ -636,51 +643,32 @@ gst_rtsp_server_get_io_channel (GstRTSPServer * server)
if (setsockopt (sockfd, SOL_SOCKET, SO_LINGER,
(void *) &linger, sizeof (linger)) < 0)
goto linger_failed;
#endif
#endif
/* set the server socket to nonblocking */
fcntl (sockfd, F_SETFL, O_NONBLOCK);
g_socket_set_blocking (socket, FALSE);
GST_DEBUG_OBJECT (server, "listening on server socket %d with queue of %d",
sockfd, server->backlog);
if (listen (sockfd, server->backlog) == -1)
/* set listen backlog */
g_socket_set_listen_backlog (socket, server->backlog);
if (!g_socket_listen (socket, error))
goto listen_failed;
GST_DEBUG_OBJECT (server,
"listened on server socket %d, returning from connection setup", sockfd);
GST_DEBUG_OBJECT (server, "listening on server socket %p with queue of %d",
socket, server->backlog);
/* create IO channel for the socket */
#ifdef G_OS_WIN32
channel = g_io_channel_win32_new_socket (sockfd);
#else
channel = g_io_channel_unix_new (sockfd);
#endif
g_io_channel_set_close_on_unref (channel, TRUE);
GST_INFO_OBJECT (server, "listening on service %s", server->service);
GST_RTSP_SERVER_UNLOCK (server);
return channel;
return socket;
/* ERRORS */
no_address:
{
GST_ERROR_OBJECT (server, "failed to resolve address: %s",
gai_strerror (ret));
goto close_error;
}
no_socket:
{
GST_ERROR_OBJECT (server, "failed to create socket: %s",
g_strerror (errno));
goto close_error;
}
keepalive_failed:
{
GST_ERROR_OBJECT (server, "failed to configure keepalive socket: %s",
g_strerror (errno));
GST_ERROR_OBJECT (server, "failed to create socket");
goto close_error;
}
#if 0
#ifdef USE_SOLINGER
linger_failed:
{
@ -689,16 +677,29 @@ linger_failed:
goto close_error;
}
#endif
#endif
listen_failed:
{
GST_ERROR_OBJECT (server, "failed to listen on socket: %s",
g_strerror (errno));
(*error)->message);
goto close_error;
}
close_error:
{
if (sockfd >= 0) {
close (sockfd);
if (socket)
g_object_unref (socket);
if (sock_error) {
if (error == NULL)
g_propagate_error (error, sock_error);
else
g_error_free (sock_error);
}
if (bind_error) {
if (error == NULL)
g_propagate_error (error, bind_error);
else
g_error_free (bind_error);
}
GST_RTSP_SERVER_UNLOCK (server);
return NULL;
@ -759,12 +760,12 @@ default_create_client (GstRTSPServer * server)
* handle a client connection on this server */
static gboolean
default_accept_client (GstRTSPServer * server, GstRTSPClient * client,
GIOChannel * channel)
GSocket * socket, GError ** error)
{
/* accept connections for that client, this function returns after accepting
* the connection and will run the remainder of the communication with the
* client asyncronously. */
if (!gst_rtsp_client_accept (client, channel))
if (!gst_rtsp_client_accept (client, socket, NULL, error))
goto accept_failed;
return TRUE;
@ -773,29 +774,29 @@ default_accept_client (GstRTSPServer * server, GstRTSPClient * client,
accept_failed:
{
GST_ERROR_OBJECT (server,
"Could not accept client on server : %s (%d)", g_strerror (errno),
errno);
"Could not accept client on server : %s", (*error)->message);
return FALSE;
}
}
/**
* gst_rtsp_server_io_func:
* @channel: a #GIOChannel
* @socket: a #GSocket
* @condition: the condition on @source
*
* A default #GIOFunc that creates a new #GstRTSPClient to accept and handle a
* new connection on @channel or @server.
* A default #GSocketSourceFunc that creates a new #GstRTSPClient to accept and handle a
* new connection on @socket or @server.
*
* Returns: TRUE if the source could be connected, FALSE if an error occured.
*/
gboolean
gst_rtsp_server_io_func (GIOChannel * channel, GIOCondition condition,
gst_rtsp_server_io_func (GSocket * socket, GIOCondition condition,
GstRTSPServer * server)
{
gboolean result;
GstRTSPClient *client = NULL;
GstRTSPServerClass *klass;
GError *error = NULL;
if (condition & G_IO_IN) {
klass = GST_RTSP_SERVER_GET_CLASS (server);
@ -807,7 +808,7 @@ gst_rtsp_server_io_func (GIOChannel * channel, GIOCondition condition,
/* a new client connected, create a client object to handle the client. */
if (klass->accept_client)
result = klass->accept_client (server, client, channel);
result = klass->accept_client (server, client, socket, &error);
if (!result)
goto accept_failed;
@ -829,7 +830,8 @@ client_failed:
}
accept_failed:
{
GST_ERROR_OBJECT (server, "failed to accept client");
GST_ERROR_OBJECT (server, "failed to accept client: %s", error->message);
g_error_free (error);
gst_object_unref (client);
return FALSE;
}
@ -843,30 +845,39 @@ watch_destroyed (GstRTSPServer * server)
}
/**
* gst_rtsp_server_create_watch:
* gst_rtsp_server_create_source:
* @server: a #GstRTSPServer
* @cancellable: a #GCancellable or %NULL.
* @error: a #GError
*
* Create a #GSource for @server. The new source will have a default
* #GIOFunc of gst_rtsp_server_io_func().
* #GSocketSourceFunc of gst_rtsp_server_io_func().
*
* Returns: the #GSource for @server or NULL when an error occured.
* @cancellable if not NULL can be used to cancel the source, which will cause
* the source to trigger, reporting the current condition (which is likely 0
* unless cancellation happened at the same time as a condition change). You can
* check for this in the callback using g_cancellable_is_cancelled().
*
* Returns: the #GSource for @server or NULL when an error occured. Free with
* g_source_unref ()
*/
GSource *
gst_rtsp_server_create_watch (GstRTSPServer * server)
gst_rtsp_server_create_source (GstRTSPServer * server,
GCancellable * cancellable, GError ** error)
{
GIOChannel *channel;
GSocket *socket;
GSource *source;
g_return_val_if_fail (GST_IS_RTSP_SERVER (server), NULL);
channel = gst_rtsp_server_get_io_channel (server);
if (channel == NULL)
goto no_channel;
socket = gst_rtsp_server_create_socket (server, NULL, error);
if (socket == NULL)
goto no_socket;
/* create a watch for reads (new connections) and possible errors */
source = g_io_create_watch (channel, G_IO_IN |
G_IO_ERR | G_IO_HUP | G_IO_NVAL);
g_io_channel_unref (channel);
source = g_socket_create_source (socket, G_IO_IN |
G_IO_ERR | G_IO_HUP | G_IO_NVAL, cancellable);
g_object_unref (socket);
/* configure the callback */
g_source_set_callback (source,
@ -875,9 +886,9 @@ gst_rtsp_server_create_watch (GstRTSPServer * server)
return source;
no_channel:
no_socket:
{
GST_ERROR_OBJECT (server, "failed to create IO channel");
GST_ERROR_OBJECT (server, "failed to create socket");
return NULL;
}
}
@ -886,6 +897,7 @@ no_channel:
* gst_rtsp_server_attach:
* @server: a #GstRTSPServer
* @context: a #GMainContext
* @error: a #GError
*
* Attaches @server to @context. When the mainloop for @context is run, the
* server will be dispatched. When @context is NULL, the default context will be
@ -901,10 +913,11 @@ gst_rtsp_server_attach (GstRTSPServer * server, GMainContext * context)
{
guint res;
GSource *source;
GError *error = NULL;
g_return_val_if_fail (GST_IS_RTSP_SERVER (server), 0);
source = gst_rtsp_server_create_watch (server);
source = gst_rtsp_server_create_source (server, NULL, &error);
if (source == NULL)
goto no_source;
@ -916,7 +929,8 @@ gst_rtsp_server_attach (GstRTSPServer * server, GMainContext * context)
/* ERRORS */
no_source:
{
GST_ERROR_OBJECT (server, "failed to create watch");
GST_ERROR_OBJECT (server, "failed to create watch: %s", error->message);
g_error_free (error);
return 0;
}
}

View file

@ -79,7 +79,7 @@ struct _GstRTSPServer {
* GstRTSPServerClass:
*
* @create_client: Create, configure a new GstRTSPClient
* object that handles the new connection on @channel.
* object that handles the new connection on @socket.
* @accept_client: accept a new GstRTSPClient
*
* The RTSP server class structure
@ -88,8 +88,8 @@ struct _GstRTSPServerClass {
GObjectClass parent_class;
GstRTSPClient * (*create_client) (GstRTSPServer *server);
gboolean (*accept_client) (GstRTSPServer *server, GstRTSPClient *client, GIOChannel *channel);
gboolean (*accept_client) (GstRTSPServer *server, GstRTSPClient *client,
GSocket *socket, GError **error);
/* signals */
void (*client_connected) (GstRTSPServer *server, GstRTSPClient *client);
};
@ -116,11 +116,15 @@ GstRTSPMediaMapping * gst_rtsp_server_get_media_mapping (GstRTSPServer *serve
void gst_rtsp_server_set_auth (GstRTSPServer *server, GstRTSPAuth *auth);
GstRTSPAuth * gst_rtsp_server_get_auth (GstRTSPServer *server);
gboolean gst_rtsp_server_io_func (GIOChannel *channel, GIOCondition condition,
gboolean gst_rtsp_server_io_func (GSocket *socket, GIOCondition condition,
GstRTSPServer *server);
GIOChannel * gst_rtsp_server_get_io_channel (GstRTSPServer *server);
GSource * gst_rtsp_server_create_watch (GstRTSPServer *server);
GSocket * gst_rtsp_server_create_socket (GstRTSPServer *server,
GCancellable *cancellable,
GError **error);
GSource * gst_rtsp_server_create_source (GstRTSPServer *server,
GCancellable * cancellable,
GError **error);
guint gst_rtsp_server_attach (GstRTSPServer *server,
GMainContext *context);

View file

@ -477,7 +477,7 @@ collect_timeout (gchar * sessionid, GstRTSPSession * sess, GstPoolSource * psrc)
gint timeout;
GTimeVal now;
g_source_get_current_time ((GSource *) psrc, &now);
g_get_current_time (&now);
timeout = gst_rtsp_session_next_timeout (sess, &now);
GST_INFO ("%p: next timeout: %d", sess, timeout);