client: simplify session transport handling

link/unlink of the transport in a session was done to keep track of all
TCP transports and to send RTP/RTCP data to the streams. We can simplify
that by putting all the TCP transports in a hashtable indexed with the
channel number.

We also don't need to link/unlink the transports when we pause/resume
the streams. The same effect is already achieved when we pause/play the
media. Indeed, when we pause the media, the transport is removed from
the media and the callbacks will not be called anymore.

See https://bugzilla.gnome.org/show_bug.cgi?id=736041
This commit is contained in:
Wim Taymans 2014-09-09 18:11:39 +02:00
parent ea5d4cfc7e
commit 0292be09ea

View file

@ -82,7 +82,7 @@ struct _GstRTSPClientPrivate
gchar *path;
GstRTSPMedia *media;
GList *transports;
GHashTable *transports;
GList *sessions;
guint sessions_cookie;
@ -134,8 +134,6 @@ static void gst_rtsp_client_set_property (GObject * object, guint propid,
static void gst_rtsp_client_finalize (GObject * obj);
static GstSDPMessage *create_sdp (GstRTSPClient * client, GstRTSPMedia * media);
static void unlink_session_transports (GstRTSPClient * client,
GstRTSPSession * session, GstRTSPSessionMedia * sessmedia);
static gboolean default_configure_client_media (GstRTSPClient * client,
GstRTSPMedia * media, GstRTSPStream * stream, GstRTSPContext * ctx);
static gboolean default_configure_client_transport (GstRTSPClient * client,
@ -282,18 +280,15 @@ gst_rtsp_client_init (GstRTSPClient * client)
g_mutex_init (&priv->watch_lock);
priv->close_seq = 0;
priv->drop_backlog = DEFAULT_DROP_BACKLOG;
priv->transports = g_hash_table_new (g_direct_hash, g_direct_equal);
}
static GstRTSPFilterResult
filter_session_media (GstRTSPSession * sess, GstRTSPSessionMedia * sessmedia,
gpointer user_data)
{
GstRTSPClient *client = GST_RTSP_CLIENT (user_data);
gst_rtsp_session_media_set_state (sessmedia, GST_STATE_NULL);
unlink_session_transports (client, sess, sessmedia);
/* unmanage the media in the session */
return GST_RTSP_FILTER_REMOVE;
}
@ -386,6 +381,8 @@ gst_rtsp_client_finalize (GObject * obj)
g_assert (priv->sessions == NULL);
g_assert (priv->session_removed_id == 0);
g_hash_table_unref (priv->transports);
if (priv->connection)
gst_rtsp_connection_free (priv->connection);
if (priv->session_pool) {
@ -715,92 +712,6 @@ do_send_data (GstBuffer * buffer, guint8 channel, GstRTSPClient * client)
return TRUE;
}
static void
link_transport (GstRTSPClient * client, GstRTSPSession * session,
GstRTSPStreamTransport * trans)
{
GstRTSPClientPrivate *priv = client->priv;
GST_DEBUG ("client %p: linking transport %p", client, trans);
gst_rtsp_stream_transport_set_callbacks (trans,
(GstRTSPSendFunc) do_send_data,
(GstRTSPSendFunc) do_send_data, client, NULL);
priv->transports = g_list_prepend (priv->transports, trans);
/* make sure our session can't expire */
gst_rtsp_session_prevent_expire (session);
}
static void
link_session_transports (GstRTSPClient * client, GstRTSPSession * session,
GstRTSPSessionMedia * sessmedia)
{
guint n_streams, i;
n_streams =
gst_rtsp_media_n_streams (gst_rtsp_session_media_get_media (sessmedia));
for (i = 0; i < n_streams; i++) {
GstRTSPStreamTransport *trans;
const GstRTSPTransport *tr;
/* get the transport, if there is no transport configured, skip this stream */
trans = gst_rtsp_session_media_get_transport (sessmedia, i);
if (trans == NULL)
continue;
tr = gst_rtsp_stream_transport_get_transport (trans);
if (tr->lower_transport == GST_RTSP_LOWER_TRANS_TCP) {
/* for TCP, link the stream to the TCP connection of the client */
link_transport (client, session, trans);
}
}
}
static void
unlink_transport (GstRTSPClient * client, GstRTSPSession * session,
GstRTSPStreamTransport * trans)
{
GstRTSPClientPrivate *priv = client->priv;
GST_DEBUG ("client %p: unlinking transport %p", client, trans);
gst_rtsp_stream_transport_set_callbacks (trans, NULL, NULL, NULL, NULL);
priv->transports = g_list_remove (priv->transports, trans);
/* our session can now expire */
gst_rtsp_session_allow_expire (session);
}
static void
unlink_session_transports (GstRTSPClient * client, GstRTSPSession * session,
GstRTSPSessionMedia * sessmedia)
{
guint n_streams, i;
n_streams =
gst_rtsp_media_n_streams (gst_rtsp_session_media_get_media (sessmedia));
for (i = 0; i < n_streams; i++) {
GstRTSPStreamTransport *trans;
const GstRTSPTransport *tr;
/* get the transport, if there is no transport configured, skip this stream */
trans = gst_rtsp_session_media_get_transport (sessmedia, i);
if (trans == NULL)
continue;
tr = gst_rtsp_stream_transport_get_transport (trans);
if (tr->lower_transport == GST_RTSP_LOWER_TRANS_TCP) {
/* for TCP, unlink the stream from the TCP connection of the client */
unlink_transport (client, session, trans);
}
}
}
/**
* gst_rtsp_client_close:
* @client: a #GstRTSPClient
@ -895,9 +806,6 @@ handle_teardown_request (GstRTSPClient * client, GstRTSPContext * ctx)
if (priv->watch != NULL)
gst_rtsp_watch_set_flushing (priv->watch, TRUE);
/* unlink the all TCP callbacks */
unlink_session_transports (client, session, sessmedia);
gst_rtsp_session_media_set_state (sessmedia, GST_STATE_NULL);
/* allow messages again so that we can send the reply */
@ -1084,9 +992,6 @@ handle_pause_request (GstRTSPClient * client, GstRTSPContext * ctx)
rtspstate != GST_RTSP_STATE_RECORDING)
goto invalid_state;
/* unlink the all TCP callbacks */
unlink_session_transports (client, session, sessmedia);
/* then pause sending */
gst_rtsp_session_media_set_state (sessmedia, GST_STATE_PAUSED);
@ -1223,9 +1128,6 @@ handle_play_request (GstRTSPClient * client, GstRTSPContext * ctx)
}
}
/* link the all TCP callbacks */
link_session_transports (client, session, sessmedia);
/* grab RTPInfo from the media now */
rtpinfo = gst_rtsp_session_media_get_rtpinfo (sessmedia);
@ -1926,11 +1828,22 @@ handle_setup_request (GstRTSPClient * client, GstRTSPContext * ctx)
/* configure the url used to set this transport, this we will use when
* generating the response for the PLAY request */
gst_rtsp_stream_transport_set_url (trans, uri);
/* configure keepalive for this transport */
gst_rtsp_stream_transport_set_keepalive (trans,
(GstRTSPKeepAliveFunc) do_keepalive, session, NULL);
if (ct->lower_transport == GST_RTSP_LOWER_TRANS_TCP) {
/* our callbacks to send data on this TCP connection */
gst_rtsp_stream_transport_set_callbacks (trans,
(GstRTSPSendFunc) do_send_data,
(GstRTSPSendFunc) do_send_data, client, NULL);
g_hash_table_insert (priv->transports,
GINT_TO_POINTER (ct->interleaved.min), trans);
g_hash_table_insert (priv->transports,
GINT_TO_POINTER (ct->interleaved.max), trans);
}
/* create and serialize the server transport */
st = make_server_transport (client, ctx, ct);
trans_str = gst_rtsp_transport_as_text (st);
@ -2610,11 +2523,10 @@ handle_data (GstRTSPClient * client, GstRTSPMessage * message)
GstRTSPClientPrivate *priv = client->priv;
GstRTSPResult res;
guint8 channel;
GList *walk;
guint8 *data;
guint size;
GstBuffer *buffer;
gboolean handled;
GstRTSPStreamTransport *trans;
/* find the stream for this message */
res = gst_rtsp_message_parse_data (message, &channel);
@ -2625,33 +2537,13 @@ handle_data (GstRTSPClient * client, GstRTSPMessage * message)
buffer = gst_buffer_new_wrapped (data, size);
handled = FALSE;
for (walk = priv->transports; walk; walk = g_list_next (walk)) {
GstRTSPStreamTransport *trans;
GstRTSPStream *stream;
const GstRTSPTransport *tr;
trans = walk->data;
tr = gst_rtsp_stream_transport_get_transport (trans);
stream = gst_rtsp_stream_transport_get_stream (trans);
/* check for TCP transport */
if (tr->lower_transport == GST_RTSP_LOWER_TRANS_TCP) {
/* dispatch to the stream based on the channel number */
if (tr->interleaved.min == channel) {
gst_rtsp_stream_recv_rtp (stream, buffer);
handled = TRUE;
break;
} else if (tr->interleaved.max == channel) {
gst_rtsp_stream_recv_rtcp (stream, buffer);
handled = TRUE;
break;
}
}
}
if (!handled)
trans = g_hash_table_lookup (priv->transports, GINT_TO_POINTER (channel));
if (trans) {
/* dispatch to the stream based on the channel number */
gst_rtsp_stream_transport_recv_data (trans, channel, buffer);
} else {
gst_buffer_unref (buffer);
}
}
/**