rtsp: use RTCP to keep the session alive

Use the RTCP rtcp-from stats field to find the associated session and use this
to keep the session alive.
This commit is contained in:
Wim Taymans 2009-05-26 19:01:10 +02:00
parent 7bbdf7bf97
commit 9bed89c3b7
5 changed files with 288 additions and 139 deletions

View file

@ -35,13 +35,14 @@ enum
PROP_LAST
};
static void gst_rtsp_client_get_property (GObject *object, guint propid,
GValue *value, GParamSpec *pspec);
static void gst_rtsp_client_set_property (GObject *object, guint propid,
const GValue *value, GParamSpec *pspec);
static void gst_rtsp_client_get_property (GObject * object, guint propid,
GValue * value, GParamSpec * pspec);
static void gst_rtsp_client_set_property (GObject * object, guint propid,
const GValue * value, GParamSpec * pspec);
static void gst_rtsp_client_finalize (GObject * obj);
static void client_session_finalized (GstRTSPClient *client, GstRTSPSession *session);
static void client_session_finalized (GstRTSPClient * client,
GstRTSPSession * session);
G_DEFINE_TYPE (GstRTSPClient, gst_rtsp_client, G_TYPE_OBJECT);
@ -59,14 +60,17 @@ gst_rtsp_client_class_init (GstRTSPClientClass * klass)
g_object_class_install_property (gobject_class, PROP_SESSION_POOL,
g_param_spec_object ("session-pool", "Session Pool",
"The session pool to use for client session",
GST_TYPE_RTSP_SESSION_POOL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
GST_TYPE_RTSP_SESSION_POOL,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_MEDIA_MAPPING,
g_param_spec_object ("media-mapping", "Media Mapping",
"The media mapping to use for client session",
GST_TYPE_RTSP_MEDIA_MAPPING, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
GST_TYPE_RTSP_MEDIA_MAPPING,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
tunnels = g_hash_table_new_full (g_str_hash, g_str_equal, g_free, g_object_unref);
tunnels =
g_hash_table_new_full (g_str_hash, g_str_equal, g_free, g_object_unref);
tunnels_lock = g_mutex_new ();
}
@ -100,8 +104,8 @@ gst_rtsp_client_finalize (GObject * obj)
}
static void
gst_rtsp_client_get_property (GObject *object, guint propid,
GValue *value, GParamSpec *pspec)
gst_rtsp_client_get_property (GObject * object, guint propid,
GValue * value, GParamSpec * pspec)
{
GstRTSPClient *client = GST_RTSP_CLIENT (object);
@ -118,8 +122,8 @@ gst_rtsp_client_get_property (GObject *object, guint propid,
}
static void
gst_rtsp_client_set_property (GObject *object, guint propid,
const GValue *value, GParamSpec *pspec)
gst_rtsp_client_set_property (GObject * object, guint propid,
const GValue * value, GParamSpec * pspec)
{
GstRTSPClient *client = GST_RTSP_CLIENT (object);
@ -151,9 +155,11 @@ gst_rtsp_client_new (void)
}
static void
send_response (GstRTSPClient *client, GstRTSPSession *session, GstRTSPMessage *response)
send_response (GstRTSPClient * client, GstRTSPSession * session,
GstRTSPMessage * response)
{
gst_rtsp_message_add_header (response, GST_RTSP_HDR_SERVER, "GStreamer RTSP server");
gst_rtsp_message_add_header (response, GST_RTSP_HDR_SERVER,
"GStreamer RTSP server");
/* remove any previous header */
gst_rtsp_message_remove_header (response, GST_RTSP_HDR_SESSION, -1);
@ -163,13 +169,14 @@ send_response (GstRTSPClient *client, GstRTSPSession *session, GstRTSPMessage *r
gchar *str;
if (session->timeout != 60)
str = g_strdup_printf ("%s; timeout=%d", session->sessionid, session->timeout);
str =
g_strdup_printf ("%s; timeout=%d", session->sessionid,
session->timeout);
else
str = g_strdup (session->sessionid);
gst_rtsp_message_take_header (response, GST_RTSP_HDR_SESSION, str);
}
#ifdef DEBUG
gst_rtsp_message_dump (response);
#endif
@ -179,19 +186,19 @@ send_response (GstRTSPClient *client, GstRTSPSession *session, GstRTSPMessage *r
}
static void
send_generic_response (GstRTSPClient *client, GstRTSPStatusCode code,
GstRTSPMessage *request)
send_generic_response (GstRTSPClient * client, GstRTSPStatusCode code,
GstRTSPMessage * request)
{
GstRTSPMessage response = { 0 };
gst_rtsp_message_init_response (&response, code,
gst_rtsp_status_as_text (code), request);
gst_rtsp_message_init_response (&response, code,
gst_rtsp_status_as_text (code), request);
send_response (client, NULL, &response);
}
static gboolean
compare_uri (const GstRTSPUrl *uri1, const GstRTSPUrl *uri2)
compare_uri (const GstRTSPUrl * uri1, const GstRTSPUrl * uri2)
{
if (uri1 == NULL || uri2 == NULL)
return FALSE;
@ -206,7 +213,7 @@ compare_uri (const GstRTSPUrl *uri1, const GstRTSPUrl *uri2)
* but is cached for when the same client (without breaking the connection) is
* doing a setup for the exact same url. */
static GstRTSPMedia *
find_media (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPMessage *request)
find_media (GstRTSPClient * client, GstRTSPUrl * uri, GstRTSPMessage * request)
{
GstRTSPMediaFactory *factory;
GstRTSPMedia *media;
@ -225,7 +232,8 @@ find_media (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPMessage *request)
goto no_mapping;
/* find the factory for the uri first */
if (!(factory = gst_rtsp_media_mapping_find_factory (client->media_mapping, uri)))
if (!(factory =
gst_rtsp_media_mapping_find_factory (client->media_mapping, uri)))
goto no_factory;
/* prepare the media and add it to the pipeline */
@ -239,11 +247,10 @@ find_media (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPMessage *request)
/* now keep track of the uri and the media */
client->uri = gst_rtsp_url_copy (uri);
client->media = media;
}
else {
} else {
/* we have seen this uri before, used cached media */
media = client->media;
g_message ("reusing cached media %p", media);
g_message ("reusing cached media %p", media);
}
if (media)
@ -278,7 +285,7 @@ no_prepare:
}
static gboolean
do_send_data (GstBuffer *buffer, guint8 channel, GstRTSPClient *client)
do_send_data (GstBuffer * buffer, guint8 channel, GstRTSPClient * client)
{
GstRTSPMessage message = { 0 };
guint8 *data;
@ -299,38 +306,36 @@ do_send_data (GstBuffer *buffer, guint8 channel, GstRTSPClient *client)
}
static void
link_stream (GstRTSPClient *client, GstRTSPSessionStream *stream)
link_stream (GstRTSPClient * client, GstRTSPSessionStream * stream)
{
gst_rtsp_session_stream_set_callbacks (stream, (GstRTSPSendFunc) do_send_data,
(GstRTSPSendFunc) do_send_data, client, NULL);
(GstRTSPSendFunc) do_send_data, client, NULL);
client->streams = g_list_prepend (client->streams, stream);
}
static void
unlink_stream (GstRTSPClient *client, GstRTSPSessionStream *stream)
unlink_stream (GstRTSPClient * client, GstRTSPSessionStream * stream)
{
gst_rtsp_session_stream_set_callbacks (stream, NULL,
NULL, NULL, NULL);
gst_rtsp_session_stream_set_callbacks (stream, NULL, NULL, NULL, NULL);
client->streams = g_list_remove (client->streams, stream);
}
static void
unlink_streams (GstRTSPClient *client)
unlink_streams (GstRTSPClient * client)
{
GList *walk;
for (walk = client->streams; walk; walk = g_list_next (walk)) {
GstRTSPSessionStream *stream = (GstRTSPSessionStream *) walk->data;
gst_rtsp_session_stream_set_callbacks (stream, NULL,
NULL, NULL, NULL);
gst_rtsp_session_stream_set_callbacks (stream, NULL, NULL, NULL, NULL);
}
g_list_free (client->streams);
client->streams = NULL;
}
static void
unlink_session_streams (GstRTSPClient *client, GstRTSPSessionMedia *media)
unlink_session_streams (GstRTSPClient * client, GstRTSPSessionMedia * media)
{
guint n_streams, i;
@ -353,7 +358,8 @@ unlink_session_streams (GstRTSPClient *client, GstRTSPSessionMedia *media)
}
static gboolean
handle_teardown_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession *session, GstRTSPMessage *request)
handle_teardown_request (GstRTSPClient * client, GstRTSPUrl * uri,
GstRTSPSession * session, GstRTSPMessage * request)
{
GstRTSPSessionMedia *media;
GstRTSPMessage response = { 0 };
@ -371,7 +377,8 @@ handle_teardown_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession
unlink_session_streams (client, media);
/* remove the session from the watched sessions */
g_object_weak_unref (G_OBJECT (session), (GWeakNotify) client_session_finalized, client);
g_object_weak_unref (G_OBJECT (session),
(GWeakNotify) client_session_finalized, client);
client->sessions = g_list_remove (client->sessions, session);
gst_rtsp_session_media_set_state (media, GST_STATE_NULL);
@ -384,7 +391,8 @@ handle_teardown_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession
}
/* construct the response now */
code = GST_RTSP_STS_OK;
gst_rtsp_message_init_response (&response, code, gst_rtsp_status_as_text (code), request);
gst_rtsp_message_init_response (&response, code,
gst_rtsp_status_as_text (code), request);
send_response (client, session, &response);
@ -404,7 +412,8 @@ not_found:
}
static gboolean
handle_pause_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession *session, GstRTSPMessage *request)
handle_pause_request (GstRTSPClient * client, GstRTSPUrl * uri,
GstRTSPSession * session, GstRTSPMessage * request)
{
GstRTSPSessionMedia *media;
GstRTSPMessage response = { 0 };
@ -431,7 +440,8 @@ handle_pause_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession *se
/* construct the response now */
code = GST_RTSP_STS_OK;
gst_rtsp_message_init_response (&response, code, gst_rtsp_status_as_text (code), request);
gst_rtsp_message_init_response (&response, code,
gst_rtsp_status_as_text (code), request);
send_response (client, session, &response);
@ -453,13 +463,15 @@ not_found:
}
invalid_state:
{
send_generic_response (client, GST_RTSP_STS_METHOD_NOT_VALID_IN_THIS_STATE, request);
send_generic_response (client, GST_RTSP_STS_METHOD_NOT_VALID_IN_THIS_STATE,
request);
return FALSE;
}
}
static gboolean
handle_play_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession *session, GstRTSPMessage *request)
handle_play_request (GstRTSPClient * client, GstRTSPUrl * uri,
GstRTSPSession * session, GstRTSPMessage * request)
{
GstRTSPSessionMedia *media;
GstRTSPMessage response = { 0 };
@ -508,8 +520,10 @@ handle_play_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession *ses
/* get the stream as configured in the session */
sstream = gst_rtsp_session_media_get_stream (media, i);
/* get the transport, if there is no transport configured, skip this stream */
if (!(tr = sstream->trans.transport))
if (!(tr = sstream->trans.transport)) {
g_message ("stream %d is not configured", i);
continue;
}
if (tr->lower_transport == GST_RTSP_LOWER_TRANS_TCP) {
/* for TCP, link the stream to the TCP connection of the client */
@ -521,7 +535,7 @@ handle_play_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession *ses
payobjclass = G_OBJECT_GET_CLASS (stream->payloader);
if (g_object_class_find_property (payobjclass, "seqnum") &&
g_object_class_find_property (payobjclass, "timestamp")) {
g_object_class_find_property (payobjclass, "timestamp")) {
GObject *payobj;
payobj = G_OBJECT (stream->payloader);
@ -533,26 +547,26 @@ handle_play_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession *ses
g_string_append (rtpinfo, ", ");
uristr = gst_rtsp_url_get_request_uri (uri);
g_string_append_printf (rtpinfo, "url=%s/stream=%d;seq=%u;rtptime=%u", uristr, i, seqnum, timestamp);
g_string_append_printf (rtpinfo, "url=%s/stream=%d;seq=%u;rtptime=%u",
uristr, i, seqnum, timestamp);
g_free (uristr);
infocount++;
}
else {
} else {
g_warning ("RTP-Info cannot be determined for stream %d", i);
}
}
/* construct the response now */
code = GST_RTSP_STS_OK;
gst_rtsp_message_init_response (&response, code, gst_rtsp_status_as_text (code), request);
gst_rtsp_message_init_response (&response, code,
gst_rtsp_status_as_text (code), request);
/* add the RTP-Info header */
if (infocount > 0) {
str = g_string_free (rtpinfo, FALSE);
gst_rtsp_message_take_header (&response, GST_RTSP_HDR_RTP_INFO, str);
}
else {
} else {
g_string_free (rtpinfo, TRUE);
}
@ -582,13 +596,22 @@ not_found:
}
invalid_state:
{
send_generic_response (client, GST_RTSP_STS_METHOD_NOT_VALID_IN_THIS_STATE, request);
send_generic_response (client, GST_RTSP_STS_METHOD_NOT_VALID_IN_THIS_STATE,
request);
return FALSE;
}
}
static void
do_keepalive (GstRTSPSession * session)
{
g_message ("keep session %p alive", session);
gst_rtsp_session_touch (session);
}
static gboolean
handle_setup_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession *session, GstRTSPMessage *request)
handle_setup_request (GstRTSPClient * client, GstRTSPUrl * uri,
GstRTSPSession * session, GstRTSPMessage * request)
{
GstRTSPResult res;
gchar *transport;
@ -623,18 +646,20 @@ handle_setup_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession *se
goto bad_request;
/* parse the transport */
res = gst_rtsp_message_get_header (request, GST_RTSP_HDR_TRANSPORT, &transport, 0);
res =
gst_rtsp_message_get_header (request, GST_RTSP_HDR_TRANSPORT, &transport,
0);
if (res != GST_RTSP_OK)
goto no_transport;
transports = g_strsplit (transport, ",", 0);
gst_rtsp_transport_new (&ct);
gst_rtsp_transport_new (&ct);
/* loop through the transports, try to parse */
have_transport = FALSE;
for (i = 0; transports[i]; i++) {
gst_rtsp_transport_init (ct);
gst_rtsp_transport_init (ct);
res = gst_rtsp_transport_parse (transports[i], ct);
if (res == GST_RTSP_OK) {
have_transport = TRUE;
@ -644,7 +669,7 @@ handle_setup_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession *se
g_strfreev (transports);
/* we have not found anything usable, error out */
if (!have_transport)
if (!have_transport)
goto unsupported_transports;
/* we have a valid transport, check if we can handle it */
@ -654,8 +679,7 @@ handle_setup_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession *se
goto unsupported_transports;
supported = GST_RTSP_LOWER_TRANS_UDP |
GST_RTSP_LOWER_TRANS_UDP_MCAST |
GST_RTSP_LOWER_TRANS_TCP;
GST_RTSP_LOWER_TRANS_UDP_MCAST | GST_RTSP_LOWER_TRANS_TCP;
if (!(ct->lower_transport & supported))
goto unsupported_transports;
@ -674,8 +698,7 @@ handle_setup_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession *se
media = gst_rtsp_session_get_media (session, uri);
need_session = FALSE;
}
else {
} else {
/* create a session if this fails we probably reached our session limit or
* something. */
if (!(session = gst_rtsp_session_pool_create (client->session_pool)))
@ -708,13 +731,18 @@ handle_setup_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession *se
st = gst_rtsp_session_stream_set_transport (stream, ct);
/* configure keepalive for this transport */
gst_rtsp_session_stream_set_keepalive (stream,
(GstRTSPKeepAliveFunc) do_keepalive, session, NULL);
/* serialize the server transport */
trans_str = gst_rtsp_transport_as_text (st);
gst_rtsp_transport_free (st);
/* construct the response now */
code = GST_RTSP_STS_OK;
gst_rtsp_message_init_response (&response, code, gst_rtsp_status_as_text (code), request);
gst_rtsp_message_init_response (&response, code,
gst_rtsp_status_as_text (code), request);
gst_rtsp_message_add_header (&response, GST_RTSP_HDR_TRANSPORT, trans_str);
g_free (trans_str);
@ -763,7 +791,7 @@ no_transport:
unsupported_transports:
{
send_generic_response (client, GST_RTSP_STS_UNSUPPORTED_TRANSPORT, request);
gst_rtsp_transport_free (ct);
gst_rtsp_transport_free (ct);
return FALSE;
}
no_pool:
@ -780,7 +808,8 @@ service_unavailable:
/* for the describe we must generate an SDP */
static gboolean
handle_describe_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession *session, GstRTSPMessage *request)
handle_describe_request (GstRTSPClient * client, GstRTSPUrl * uri,
GstRTSPSession * session, GstRTSPMessage * request)
{
GstRTSPMessage response = { 0 };
GstRTSPResult res;
@ -791,10 +820,11 @@ handle_describe_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession
/* check what kind of format is accepted, we don't really do anything with it
* and always return SDP for now. */
for (i = 0; i++; ) {
for (i = 0; i++;) {
gchar *accept;
res = gst_rtsp_message_get_header (request, GST_RTSP_HDR_ACCEPT, &accept, i);
res =
gst_rtsp_message_get_header (request, GST_RTSP_HDR_ACCEPT, &accept, i);
if (res == GST_RTSP_ENOTIMPL)
break;
@ -812,10 +842,11 @@ handle_describe_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession
g_object_unref (media);
gst_rtsp_message_init_response (&response, GST_RTSP_STS_OK,
gst_rtsp_status_as_text (GST_RTSP_STS_OK), request);
gst_rtsp_message_init_response (&response, GST_RTSP_STS_OK,
gst_rtsp_status_as_text (GST_RTSP_STS_OK), request);
gst_rtsp_message_add_header (&response, GST_RTSP_HDR_CONTENT_TYPE, "application/sdp");
gst_rtsp_message_add_header (&response, GST_RTSP_HDR_CONTENT_TYPE,
"application/sdp");
/* content base for some clients that might screw up creating the setup uri */
str = g_strdup_printf ("rtsp://%s:%u%s/", uri->host, uri->port, uri->abspath);
@ -824,7 +855,7 @@ handle_describe_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession
/* add SDP to the response body */
str = gst_sdp_message_as_text (sdp);
gst_rtsp_message_take_body (&response, (guint8 *)str, strlen (str));
gst_rtsp_message_take_body (&response, (guint8 *) str, strlen (str));
gst_sdp_message_free (sdp);
send_response (client, session, &response);
@ -846,25 +877,24 @@ no_sdp:
}
static void
handle_options_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession *session, GstRTSPMessage *request)
handle_options_request (GstRTSPClient * client, GstRTSPUrl * uri,
GstRTSPSession * session, GstRTSPMessage * request)
{
GstRTSPMessage response = { 0 };
GstRTSPMethod options;
gchar *str;
options = GST_RTSP_DESCRIBE |
GST_RTSP_OPTIONS |
GST_RTSP_PAUSE |
GST_RTSP_PLAY |
GST_RTSP_SETUP |
GST_RTSP_GET_PARAMETER |
GST_RTSP_SET_PARAMETER |
GST_RTSP_TEARDOWN;
GST_RTSP_OPTIONS |
GST_RTSP_PAUSE |
GST_RTSP_PLAY |
GST_RTSP_SETUP |
GST_RTSP_GET_PARAMETER | GST_RTSP_SET_PARAMETER | GST_RTSP_TEARDOWN;
str = gst_rtsp_options_as_text (options);
gst_rtsp_message_init_response (&response, GST_RTSP_STS_OK,
gst_rtsp_status_as_text (GST_RTSP_STS_OK), request);
gst_rtsp_message_init_response (&response, GST_RTSP_STS_OK,
gst_rtsp_status_as_text (GST_RTSP_STS_OK), request);
gst_rtsp_message_add_header (&response, GST_RTSP_HDR_PUBLIC, str);
g_free (str);
@ -874,7 +904,7 @@ handle_options_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession *
/* remove duplicate and trailing '/' */
static void
santize_uri (GstRTSPUrl *uri)
santize_uri (GstRTSPUrl * uri)
{
gint i, len;
gchar *s, *d;
@ -894,22 +924,22 @@ santize_uri (GstRTSPUrl *uri)
}
len = d - uri->abspath;
/* don't remove the first slash if that's the only thing left */
if (len > 1 && *(d-1) == '/')
if (len > 1 && *(d - 1) == '/')
d--;
*d = '\0';
}
static void
client_session_finalized (GstRTSPClient *client, GstRTSPSession *session)
client_session_finalized (GstRTSPClient * client, GstRTSPSession * session)
{
if (!(client->sessions = g_list_remove (client->sessions, session))) {
g_message ("all sessions finalized, close the connection");
g_source_destroy ((GSource*)client->watch);
g_source_destroy ((GSource *) client->watch);
}
}
static void
client_watch_session (GstRTSPClient *client, GstRTSPSession *session)
client_watch_session (GstRTSPClient * client, GstRTSPSession * session)
{
GList *walk;
@ -923,12 +953,13 @@ client_watch_session (GstRTSPClient *client, GstRTSPSession *session)
g_message ("watching session %p", session);
g_object_weak_ref (G_OBJECT (session), (GWeakNotify) client_session_finalized, client);
g_object_weak_ref (G_OBJECT (session), (GWeakNotify) client_session_finalized,
client);
client->sessions = g_list_prepend (client->sessions, session);
}
static void
handle_request (GstRTSPClient *client, GstRTSPMessage *request)
handle_request (GstRTSPClient * client, GstRTSPMessage * request)
{
GstRTSPMethod method;
const gchar *uristr;
@ -948,7 +979,8 @@ handle_request (GstRTSPClient *client, GstRTSPMessage *request)
if (version != GST_RTSP_VERSION_1_0) {
/* we can only handle 1.0 requests */
send_generic_response (client, GST_RTSP_STS_RTSP_VERSION_NOT_SUPPORTED, request);
send_generic_response (client, GST_RTSP_STS_RTSP_VERSION_NOT_SUPPORTED,
request);
return;
}
@ -975,8 +1007,7 @@ handle_request (GstRTSPClient *client, GstRTSPMessage *request)
* disappears because it times out, we will be notified. If all sessions are
* gone, we will close the connection */
client_watch_session (client, session);
}
else
} else
session = NULL;
/* now see what is asked and dispatch to a dedicated handler */
@ -1033,7 +1064,7 @@ session_not_found:
}
static void
handle_data (GstRTSPClient *client, GstRTSPMessage *message)
handle_data (GstRTSPClient * client, GstRTSPMessage * message)
{
GstRTSPResult res;
guint8 channel;
@ -1043,7 +1074,7 @@ handle_data (GstRTSPClient *client, GstRTSPMessage *message)
GstBuffer *buffer;
gboolean handled;
/* find the stream for this message */
/* find the stream for this message */
res = gst_rtsp_message_parse_data (message, &channel);
if (res != GST_RTSP_OK)
return;
@ -1073,12 +1104,12 @@ handle_data (GstRTSPClient *client, GstRTSPMessage *message)
if (tr->lower_transport == GST_RTSP_LOWER_TRANS_TCP) {
/* dispatch to the stream based on the channel number */
if (tr->interleaved.min == channel) {
gst_rtsp_media_stream_rtp (mstream, buffer);
handled = TRUE;
gst_rtsp_media_stream_rtp (mstream, buffer);
handled = TRUE;
break;
} else if (tr->interleaved.max == channel) {
gst_rtsp_media_stream_rtcp (mstream, buffer);
handled = TRUE;
gst_rtsp_media_stream_rtcp (mstream, buffer);
handled = TRUE;
break;
}
}
@ -1097,7 +1128,8 @@ handle_data (GstRTSPClient *client, GstRTSPMessage *message)
* that created the client but can be overridden later.
*/
void
gst_rtsp_client_set_session_pool (GstRTSPClient *client, GstRTSPSessionPool *pool)
gst_rtsp_client_set_session_pool (GstRTSPClient * client,
GstRTSPSessionPool * pool)
{
GstRTSPSessionPool *old;
@ -1120,7 +1152,7 @@ gst_rtsp_client_set_session_pool (GstRTSPClient *client, GstRTSPSessionPool *poo
* Returns: a #GstRTSPSessionPool, unref after usage.
*/
GstRTSPSessionPool *
gst_rtsp_client_get_session_pool (GstRTSPClient *client)
gst_rtsp_client_get_session_pool (GstRTSPClient * client)
{
GstRTSPSessionPool *result;
@ -1140,7 +1172,8 @@ gst_rtsp_client_get_session_pool (GstRTSPClient *client)
* created the client but can be overriden later.
*/
void
gst_rtsp_client_set_media_mapping (GstRTSPClient *client, GstRTSPMediaMapping *mapping)
gst_rtsp_client_set_media_mapping (GstRTSPClient * client,
GstRTSPMediaMapping * mapping)
{
GstRTSPMediaMapping *old;
@ -1164,7 +1197,7 @@ gst_rtsp_client_set_media_mapping (GstRTSPClient *client, GstRTSPMediaMapping *m
* Returns: a #GstRTSPMediaMapping, unref after usage.
*/
GstRTSPMediaMapping *
gst_rtsp_client_get_media_mapping (GstRTSPClient *client)
gst_rtsp_client_get_media_mapping (GstRTSPClient * client)
{
GstRTSPMediaMapping *result;
@ -1175,7 +1208,8 @@ gst_rtsp_client_get_media_mapping (GstRTSPClient *client)
}
static GstRTSPResult
message_received (GstRTSPWatch *watch, GstRTSPMessage *message, gpointer user_data)
message_received (GstRTSPWatch * watch, GstRTSPMessage * message,
gpointer user_data)
{
GstRTSPClient *client = GST_RTSP_CLIENT (user_data);
@ -1195,7 +1229,7 @@ message_received (GstRTSPWatch *watch, GstRTSPMessage *message, gpointer user_da
}
static GstRTSPResult
message_sent (GstRTSPWatch *watch, guint cseq, gpointer user_data)
message_sent (GstRTSPWatch * watch, guint cseq, gpointer user_data)
{
GstRTSPClient *client = GST_RTSP_CLIENT (user_data);
@ -1205,7 +1239,7 @@ message_sent (GstRTSPWatch *watch, guint cseq, gpointer user_data)
}
static GstRTSPResult
closed (GstRTSPWatch *watch, gpointer user_data)
closed (GstRTSPWatch * watch, gpointer user_data)
{
GstRTSPClient *client = GST_RTSP_CLIENT (user_data);
const gchar *tunnelid;
@ -1225,7 +1259,7 @@ closed (GstRTSPWatch *watch, gpointer user_data)
}
static GstRTSPResult
error (GstRTSPWatch *watch, GstRTSPResult result, gpointer user_data)
error (GstRTSPWatch * watch, GstRTSPResult result, gpointer user_data)
{
GstRTSPClient *client = GST_RTSP_CLIENT (user_data);
gchar *str;
@ -1238,7 +1272,7 @@ error (GstRTSPWatch *watch, GstRTSPResult result, gpointer user_data)
}
static GstRTSPStatusCode
tunnel_start (GstRTSPWatch *watch, gpointer user_data)
tunnel_start (GstRTSPWatch * watch, gpointer user_data)
{
GstRTSPClient *client;
const gchar *tunnelid;
@ -1272,7 +1306,7 @@ tunnel_existed:
}
static GstRTSPResult
tunnel_complete (GstRTSPWatch *watch, gpointer user_data)
tunnel_complete (GstRTSPWatch * watch, gpointer user_data)
{
const gchar *tunnelid;
GstRTSPClient *client = GST_RTSP_CLIENT (user_data);
@ -1336,7 +1370,7 @@ static GstRTSPWatchFuncs watch_funcs = {
* Returns: %TRUE if the client could be accepted.
*/
gboolean
gst_rtsp_client_accept (GstRTSPClient *client, GIOChannel *channel)
gst_rtsp_client_accept (GstRTSPClient * client, GIOChannel * channel)
{
int sock;
GstRTSPConnection *conn;
@ -1351,14 +1385,13 @@ gst_rtsp_client_accept (GstRTSPClient *client, GIOChannel *channel)
GST_RTSP_CHECK (gst_rtsp_connection_accept (sock, &conn), accept_failed);
url = gst_rtsp_connection_get_url (conn);
g_message ("added new client %p ip %s:%d", client,
url->host, url->port);
g_message ("added new client %p ip %s:%d", client, url->host, url->port);
client->connection = conn;
/* create watch for the connection and attach */
client->watch = gst_rtsp_watch_new (client->connection, &watch_funcs,
g_object_ref (client), g_object_unref);
g_object_ref (client), g_object_unref);
/* find the context to add the watch */
if ((source = g_main_current_source ()))
@ -1378,8 +1411,7 @@ accept_failed:
{
gchar *str = gst_rtsp_strresult (res);
g_error ("Could not accept client on server socket %d: %s",
sock, str);
g_error ("Could not accept client on server socket %d: %s", sock, str);
g_free (str);
return FALSE;
}

View file

@ -17,6 +17,8 @@
* Boston, MA 02111-1307, USA.
*/
#include <string.h>
#include <gst/app/gstappsrc.h>
#include <gst/app/gstappsink.h>
@ -39,6 +41,8 @@ enum
SIGNAL_LAST
};
static GQuark ssrc_stream_map_key;
static void gst_rtsp_media_get_property (GObject *object, guint propid,
GValue *value, GParamSpec *pspec);
static void gst_rtsp_media_set_property (GObject *object, guint propid,
@ -87,6 +91,8 @@ gst_rtsp_media_class_init (GstRTSPMediaClass * klass)
g_critical ("could not start bus thread: %s", error->message);
}
klass->handle_message = default_handle_message;
ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
}
static void
@ -106,6 +112,8 @@ gst_rtsp_media_stream_free (GstRTSPMediaStream *stream)
if (stream->caps)
gst_caps_unref (stream->caps);
g_list_free (stream->transports);
g_free (stream);
}
@ -700,48 +708,112 @@ dump_structure (const GstStructure *s)
g_free (sstr);
}
static void
on_new_ssrc (GObject *session, GObject *source, GstRTSPMedia *media)
static GstRTSPMediaTrans *
find_transport (GstRTSPMediaStream *stream, const gchar *rtcp_from)
{
g_message ("%p: new source %p", media, source);
GList *walk;
GstRTSPMediaTrans *result = NULL;
const gchar *dest;
guint port;
if (rtcp_from == NULL)
return NULL;
dest = g_strrstr (rtcp_from, ":");
if (dest == NULL)
return NULL;
port = atoi (dest + 1);
dest = g_strndup (rtcp_from, dest - rtcp_from);
g_message ("finding %s:%d", dest, port);
for (walk = stream->transports; walk; walk = g_list_next (walk)) {
GstRTSPMediaTrans *trans = walk->data;
gint min, max;
min = trans->transport->client_port.min;
max = trans->transport->client_port.max;
if ((strcmp (trans->transport->destination, dest) == 0) && (min == port || max == port)) {
result = trans;
break;
}
}
return result;
}
static void
on_ssrc_sdes (GObject *session, GObject *source, GstRTSPMedia *media)
on_new_ssrc (GObject *session, GObject *source, GstRTSPMediaStream *stream)
{
GstStructure *stats;
GstRTSPMediaTrans *trans;
g_message ("%p: new source %p", stream, source);
/* see if we have a stream to match with the origin of the RTCP packet */
trans = g_object_get_qdata (source, ssrc_stream_map_key);
if (trans == NULL) {
g_object_get (source, "stats", &stats, NULL);
if (stats) {
const gchar *rtcp_from;
rtcp_from = gst_structure_get_string (stats, "rtcp-from");
if ((trans = find_transport (stream, rtcp_from))) {
g_message ("%p: found transport %p for source %p", stream, trans, source);
g_object_set_qdata (source, ssrc_stream_map_key, trans);
}
}
} else {
g_message ("%p: source %p for transport %p", stream, source, trans);
}
}
static void
on_ssrc_sdes (GObject *session, GObject *source, GstRTSPMediaStream *stream)
{
GstStructure *sdes;
g_message ("%p: new SDES %p", media, source);
g_message ("%p: new SDES %p", stream, source);
g_object_get (source, "sdes", &sdes, NULL);
dump_structure (sdes);
}
static void
on_ssrc_active (GObject *session, GObject *source, GstRTSPMedia *media)
on_ssrc_active (GObject *session, GObject *source, GstRTSPMediaStream *stream)
{
GstStructure *stats;
GstRTSPMediaTrans *trans;
trans = g_object_get_qdata (source, ssrc_stream_map_key);
g_message ("%p: source %p in transport %p is active", stream, trans, source);
if (trans && trans->keep_alive) {
trans->keep_alive (trans->ka_user_data);
}
g_message ("%p: source %p is active", media, source);
g_object_get (source, "stats", &stats, NULL);
dump_structure (stats);
gst_structure_free (stats);
}
static void
on_bye_ssrc (GObject *session, GObject *source, GstRTSPMedia *media)
on_bye_ssrc (GObject *session, GObject *source, GstRTSPMediaStream *stream)
{
g_message ("%p: source %p bye", media, source);
g_message ("%p: source %p bye", stream, source);
}
static void
on_bye_timeout (GObject *session, GObject *source, GstRTSPMedia *media)
on_bye_timeout (GObject *session, GObject *source, GstRTSPMediaStream *stream)
{
g_message ("%p: source %p bye timeout", media, source);
g_message ("%p: source %p bye timeout", stream, source);
}
static void
on_timeout (GObject *session, GObject *source, GstRTSPMedia *media)
on_timeout (GObject *session, GObject *source, GstRTSPMediaStream *stream)
{
g_message ("%p: source %p timeout", media, source);
g_message ("%p: source %p timeout", stream, source);
}
static GstFlowReturn
@ -836,17 +908,17 @@ setup_stream (GstRTSPMediaStream *stream, guint idx, GstRTSPMedia *media)
&stream->session);
g_signal_connect (stream->session, "on-new-ssrc", (GCallback) on_new_ssrc,
media);
stream);
g_signal_connect (stream->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
media);
stream);
g_signal_connect (stream->session, "on-ssrc-active", (GCallback) on_ssrc_active,
media);
stream);
g_signal_connect (stream->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
media);
stream);
g_signal_connect (stream->session, "on-bye-timeout", (GCallback) on_bye_timeout,
media);
stream);
g_signal_connect (stream->session, "on-timeout", (GCallback) on_timeout,
media);
stream);
/* link the RTP pad to the session manager */
ret = gst_pad_link (stream->srcpad, stream->send_rtp_sink);
@ -1361,12 +1433,14 @@ gst_rtsp_media_set_state (GstRTSPMedia *media, GstState state, GArray *transport
g_message ("adding %s:%d-%d", dest, min, max);
g_signal_emit_by_name (stream->udpsink[0], "add", dest, min, NULL);
g_signal_emit_by_name (stream->udpsink[1], "add", dest, max, NULL);
stream->transports = g_list_prepend (stream->transports, tr);
tr->active = TRUE;
media->active++;
} else if (remove && tr->active) {
g_message ("removing %s:%d-%d", dest, min, max);
g_signal_emit_by_name (stream->udpsink[0], "remove", dest, min, NULL);
g_signal_emit_by_name (stream->udpsink[1], "remove", dest, max, NULL);
stream->transports = g_list_remove (stream->transports, tr);
tr->active = FALSE;
media->active--;
}

View file

@ -41,7 +41,8 @@ typedef struct _GstRTSPMedia GstRTSPMedia;
typedef struct _GstRTSPMediaClass GstRTSPMediaClass;
typedef struct _GstRTSPMediaTrans GstRTSPMediaTrans;
typedef gboolean (*GstRTSPSendFunc) (GstBuffer *buffer, guint8 channel, gpointer user_data);
typedef gboolean (*GstRTSPSendFunc) (GstBuffer *buffer, guint8 channel, gpointer user_data);
typedef void (*GstRTSPKeepAliveFunc) (gpointer user_data);
/**
* GstRTSPMediaTrans:
@ -50,20 +51,33 @@ typedef gboolean (*GstRTSPSendFunc) (GstBuffer *buffer, guint8 channel, gpointer
* @send_rtcp: callback for sending RTCP messages
* @user_data: user data passed in the callbacks
* @notify: free function for the user_data.
* @keep_alive: keep alive callback
* @ka_user_data: data passed to @keep_alive
* @ka_notify: called when @ka_user_data is freed
* @active: if we are actively sending
* @timeout: if we timed out
* @transport: a transport description
* @rtpsource: the receiver rtp source object
*
* A Transport description for stream @idx
*/
struct _GstRTSPMediaTrans {
guint idx;
GstRTSPSendFunc send_rtp;
GstRTSPSendFunc send_rtcp;
gpointer user_data;
GDestroyNotify notify;
gboolean active;
GstRTSPSendFunc send_rtp;
GstRTSPSendFunc send_rtcp;
gpointer user_data;
GDestroyNotify notify;
GstRTSPTransport *transport;
GstRTSPKeepAliveFunc keep_alive;
gpointer ka_user_data;
GDestroyNotify ka_notify;
gboolean active;
gboolean timeout;
GstRTSPTransport *transport;
GObject *rtpsource;
};
/**

View file

@ -74,6 +74,10 @@ gst_rtsp_session_free_stream (GstRTSPSessionStream *stream)
{
g_message ("free session stream %p", stream);
/* remove callbacks now */
gst_rtsp_session_stream_set_callbacks (stream, NULL, NULL, NULL, NULL);
gst_rtsp_session_stream_set_keepalive (stream, NULL, NULL, NULL);
if (stream->trans.transport)
gst_rtsp_transport_free (stream->trans.transport);
@ -308,7 +312,7 @@ gst_rtsp_session_media_get_stream (GstRTSPSessionMedia *media, guint idx)
result->trans.transport = NULL;
result->media_stream = media_stream;
g_array_insert_val (media->streams, idx, result);
g_array_index (media->streams, GstRTSPSessionStream *, idx) = result;
}
return result;
@ -512,6 +516,27 @@ gst_rtsp_session_stream_set_callbacks (GstRTSPSessionStream *stream,
stream->trans.notify = notify;
}
/**
* gst_rtsp_session_stream_set_keepalive:
* @stream: a #GstRTSPSessionStream
* @keep_alive: a callback called when the receiver is active
* @user_data: user data passed to callback
* @notify: called with the user_data when no longer needed.
*
* Install callbacks that will be called when RTCP packets are received from the
* receiver of @stream.
*/
void
gst_rtsp_session_stream_set_keepalive (GstRTSPSessionStream *stream,
GstRTSPKeepAliveFunc keep_alive, gpointer user_data, GDestroyNotify notify)
{
stream->trans.keep_alive = keep_alive;
if (stream->trans.ka_notify)
stream->trans.ka_notify (stream->trans.ka_user_data);
stream->trans.ka_user_data = user_data;
stream->trans.ka_notify = notify;
}
/**
* gst_rtsp_session_media_set_state:
* @media: a #GstRTSPSessionMedia

View file

@ -146,6 +146,10 @@ void gst_rtsp_session_stream_set_callbacks (GstRTSPSessionStre
GstRTSPSendFunc send_rtcp,
gpointer user_data,
GDestroyNotify notify);
void gst_rtsp_session_stream_set_keepalive (GstRTSPSessionStream *stream,
GstRTSPKeepAliveFunc keep_alive,
gpointer user_data,
GDestroyNotify notify);
G_END_DECLS