rtsp-server: Add support for tunneling

Add support for tunneling over HTTP.
Use new connection methods to retrieve the url.
Dispatch messages based on the message type instead of blindly
assuming it's always a request.
Keep track of the watch id so that we can remove it later.
Set the media pipeline to NULL before unreffing the pipeline.
This commit is contained in:
Wim Taymans 2009-03-04 12:44:01 +01:00
parent daf27d2704
commit 2f8025dbdd
3 changed files with 116 additions and 9 deletions

View file

@ -26,6 +26,9 @@
#define DEFAULT_TIMEOUT 60 #define DEFAULT_TIMEOUT 60
static GMutex *tunnels_lock;
static GHashTable *tunnels;
enum enum
{ {
PROP_0, PROP_0,
@ -67,6 +70,9 @@ gst_rtsp_client_class_init (GstRTSPClientClass * klass)
g_param_spec_object ("media-mapping", "Media Mapping", g_param_spec_object ("media-mapping", "Media Mapping",
"The media mapping to use for client session", "The media mapping to use for client session",
GST_TYPE_RTSP_MEDIA_MAPPING, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); GST_TYPE_RTSP_MEDIA_MAPPING, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
tunnels = g_hash_table_new_full (g_str_hash, g_str_equal, g_free, g_object_unref);
tunnels_lock = g_mutex_new ();
} }
static void static void
@ -486,6 +492,7 @@ handle_setup_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession *se
guint streamid; guint streamid;
GstRTSPSessionMedia *media; GstRTSPSessionMedia *media;
gboolean need_session; gboolean need_session;
GstRTSPUrl *url;
/* the uri contains the stream number we added in the SDP config, which is /* the uri contains the stream number we added in the SDP config, which is
* always /stream=%d so we need to strip that off * always /stream=%d so we need to strip that off
@ -535,7 +542,7 @@ handle_setup_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession *se
goto unsupported_transports; goto unsupported_transports;
supported = GST_RTSP_LOWER_TRANS_UDP | supported = GST_RTSP_LOWER_TRANS_UDP |
GST_RTSP_LOWER_TRANS_UDP_MCAST | GST_RTSP_LOWER_TRANS_TCP; GST_RTSP_LOWER_TRANS_UDP_MCAST;
if (!(ct->lower_transport & supported)) if (!(ct->lower_transport & supported))
goto unsupported_transports; goto unsupported_transports;
@ -544,7 +551,8 @@ handle_setup_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession *se
/* we have a valid transport now, set the destination of the client. */ /* we have a valid transport now, set the destination of the client. */
g_free (ct->destination); g_free (ct->destination);
ct->destination = g_strdup (client->connection->url->host); url = gst_rtsp_connection_get_url (client->connection);
ct->destination = g_strdup (url->host);
if (session) { if (session) {
g_object_ref (session); g_object_ref (session);
@ -996,8 +1004,17 @@ message_received (GstRTSPWatch *watch, GstRTSPMessage *message, gpointer user_da
g_message ("client %p: received a message", client); g_message ("client %p: received a message", client);
handle_request (client, message); switch (message->type) {
case GST_RTSP_MESSAGE_REQUEST:
handle_request (client, message);
break;
case GST_RTSP_MESSAGE_RESPONSE:
break;
case GST_RTSP_MESSAGE_DATA:
break;
default:
break;
}
return GST_RTSP_OK; return GST_RTSP_OK;
} }
@ -1015,9 +1032,15 @@ static GstRTSPResult
closed (GstRTSPWatch *watch, gpointer user_data) closed (GstRTSPWatch *watch, gpointer user_data)
{ {
GstRTSPClient *client = GST_RTSP_CLIENT (user_data); GstRTSPClient *client = GST_RTSP_CLIENT (user_data);
const gchar *tunnelid;
g_message ("client %p: connection closed", client); g_message ("client %p: connection closed", client);
tunnelid = gst_rtsp_connection_get_tunnelid (client->connection);
g_mutex_lock (tunnels_lock);
g_hash_table_remove (tunnels, tunnelid);
g_mutex_unlock (tunnels_lock);
return GST_RTSP_OK; return GST_RTSP_OK;
} }
@ -1034,11 +1057,90 @@ error (GstRTSPWatch *watch, GstRTSPResult result, gpointer user_data)
return GST_RTSP_OK; return GST_RTSP_OK;
} }
static GstRTSPStatusCode
tunnel_start (GstRTSPWatch *watch, gpointer user_data)
{
GstRTSPClient *client;
const gchar *tunnelid;
client = GST_RTSP_CLIENT (user_data);
g_message ("client %p: tunnel start", client);
/* store client in the pending tunnels */
tunnelid = gst_rtsp_connection_get_tunnelid (client->connection);
g_message ("client %p: inserting %s", client, tunnelid);
/* we can't have two clients connecting with the same tunnelid */
g_mutex_lock (tunnels_lock);
if (g_hash_table_lookup (tunnels, tunnelid))
goto tunnel_existed;
g_hash_table_insert (tunnels, g_strdup (tunnelid), g_object_ref (client));
g_mutex_unlock (tunnels_lock);
return GST_RTSP_STS_OK;
/* ERRORS */
tunnel_existed:
{
g_mutex_unlock (tunnels_lock);
g_message ("client %p: tunnel session %s existed", client, tunnelid);
return GST_RTSP_STS_SERVICE_UNAVAILABLE;
}
}
static GstRTSPResult
tunnel_complete (GstRTSPWatch *watch, gpointer user_data)
{
const gchar *tunnelid;
GstRTSPClient *client = GST_RTSP_CLIENT (user_data);
GstRTSPClient *oclient;
g_message ("client %p: tunnel complete", client);
/* find previous tunnel */
tunnelid = gst_rtsp_connection_get_tunnelid (client->connection);
g_mutex_lock (tunnels_lock);
if (!(oclient = g_hash_table_lookup (tunnels, tunnelid)))
goto no_tunnel;
/* remove the old client from the table. ref before because removing it will
* remove the ref to it. */
g_object_ref (oclient);
g_hash_table_remove (tunnels, tunnelid);
g_mutex_unlock (tunnels_lock);
g_message ("client %p: found tunnel %p", client, oclient);
/* merge the tunnels into the first client */
gst_rtsp_connection_do_tunnel (oclient->connection, client->connection);
gst_rtsp_watch_reset (oclient->watch);
g_object_unref (oclient);
/* we don't need this watch anymore */
g_source_remove (client->watchid);
return GST_RTSP_OK;
/* ERRORS */
no_tunnel:
{
g_mutex_unlock (tunnels_lock);
g_message ("client %p: tunnel session %s not found", client, tunnelid);
return GST_RTSP_OK;
}
}
static GstRTSPWatchFuncs watch_funcs = { static GstRTSPWatchFuncs watch_funcs = {
message_received, message_received,
message_sent, message_sent,
closed, closed,
error error,
tunnel_start,
tunnel_complete
}; };
/** /**
@ -1061,14 +1163,16 @@ gst_rtsp_client_accept (GstRTSPClient *client, GIOChannel *channel)
GstRTSPResult res; GstRTSPResult res;
GSource *source; GSource *source;
GMainContext *context; GMainContext *context;
GstRTSPUrl *url;
/* a new client connected. */ /* a new client connected. */
sock = g_io_channel_unix_get_fd (channel); sock = g_io_channel_unix_get_fd (channel);
GST_RTSP_CHECK (gst_rtsp_connection_accept (sock, &conn), accept_failed); GST_RTSP_CHECK (gst_rtsp_connection_accept (sock, &conn), accept_failed);
g_message ("added new client %p ip %s:%d with fd %d", client, url = gst_rtsp_connection_get_url (conn);
conn->url->host, conn->url->port, conn->fd.fd); g_message ("added new client %p ip %s:%d", client,
url->host, url->port);
client->connection = conn; client->connection = conn;
@ -1084,7 +1188,7 @@ gst_rtsp_client_accept (GstRTSPClient *client, GIOChannel *channel)
g_message ("attaching to context %p", context); g_message ("attaching to context %p", context);
gst_rtsp_watch_attach (client->watch, context); client->watchid = gst_rtsp_watch_attach (client->watch, context);
gst_rtsp_watch_unref (client->watch); gst_rtsp_watch_unref (client->watch);
return TRUE; return TRUE;

View file

@ -71,6 +71,7 @@ struct _GstRTSPClient {
GstRTSPConnection *connection; GstRTSPConnection *connection;
GstRTSPWatch *watch; GstRTSPWatch *watch;
guint watchid;
guint timeout; guint timeout;
GstRTSPSessionPool *session_pool; GstRTSPSessionPool *session_pool;

View file

@ -110,8 +110,10 @@ gst_rtsp_media_finalize (GObject * obj)
g_source_unref (media->source); g_source_unref (media->source);
} }
if (media->pipeline) if (media->pipeline) {
gst_element_set_state (media->pipeline, GST_STATE_NULL);
gst_object_unref (media->pipeline); gst_object_unref (media->pipeline);
}
G_OBJECT_CLASS (gst_rtsp_media_parent_class)->finalize (obj); G_OBJECT_CLASS (gst_rtsp_media_parent_class)->finalize (obj);
} }