rtsp-client: Add support for sending buffer lists directly

Fixes https://gitlab.freedesktop.org/gstreamer/gst-rtsp-server/issues/29
This commit is contained in:
Sebastian Dröge 2018-09-17 22:18:46 +03:00
parent d708f9736b
commit c372643e1e
4 changed files with 186 additions and 20 deletions

View file

@ -118,6 +118,8 @@ gst_rtsp_client_attach
GstRTSPClientSendFunc
gst_rtsp_client_set_send_func
GstRTSPClientSendMessagesFunc
gst_rtsp_client_set_send_messages_func
gst_rtsp_client_handle_message
gst_rtsp_client_send_message

View file

@ -80,6 +80,9 @@ struct _GstRTSPClientPrivate
GstRTSPClientSendFunc send_func;
gpointer send_data;
GDestroyNotify send_notify;
GstRTSPClientSendMessagesFunc send_messages_func;
gpointer send_messages_data;
GDestroyNotify send_messages_notify;
guint close_seq;
GArray *data_seqs;
@ -753,6 +756,7 @@ gst_rtsp_client_finalize (GObject * obj)
if (priv->watch)
gst_rtsp_watch_set_flushing (priv->watch, TRUE);
gst_rtsp_client_set_send_func (client, NULL, NULL, NULL);
gst_rtsp_client_set_send_messages_func (client, NULL, NULL, NULL);
if (priv->watch)
g_source_destroy ((GSource *) priv->watch);
@ -1158,17 +1162,10 @@ do_send_data (GstBuffer * buffer, guint8 channel, GstRTSPClient * client)
GstRTSPClientPrivate *priv = client->priv;
GstRTSPMessage message = { 0 };
gboolean ret = TRUE;
GstMapInfo map_info;
guint8 *data;
guint usize;
gst_rtsp_message_init_data (&message, channel);
/* FIXME, need some sort of iovec RTSPMessage here */
if (!gst_buffer_map (buffer, &map_info, GST_MAP_READ))
return FALSE;
gst_rtsp_message_take_body (&message, map_info.data, map_info.size);
gst_rtsp_message_set_body_buffer (&message, buffer);
g_mutex_lock (&priv->send_lock);
if (get_data_seq (client, channel) != 0) {
@ -1180,9 +1177,6 @@ do_send_data (GstBuffer * buffer, guint8 channel, GstRTSPClient * client)
ret = priv->send_func (client, &message, FALSE, priv->send_data);
g_mutex_unlock (&priv->send_lock);
gst_rtsp_message_steal_body (&message, &data, &usize);
gst_buffer_unmap (buffer, &map_info);
gst_rtsp_message_unset (&message);
if (!ret) {
@ -1202,18 +1196,50 @@ static gboolean
do_send_data_list (GstBufferList * buffer_list, guint8 channel,
GstRTSPClient * client)
{
GstRTSPClientPrivate *priv = client->priv;
gboolean ret = TRUE;
guint i, n = gst_buffer_list_length (buffer_list);
GstRTSPMessage *messages;
/* TODO: Needs support for a) queueing up multiple messages on the
* GstRTSPWatch in do_send_data() above and b) for one message having a body
* consisting of multiple parts here */
g_mutex_lock (&priv->send_lock);
if (get_data_seq (client, channel) != 0) {
GST_WARNING ("already a queued data message for channel %d", channel);
g_mutex_unlock (&priv->send_lock);
return FALSE;
}
messages = g_newa (GstRTSPMessage, n);
memset (messages, 0, sizeof (GstRTSPMessage) * n);
for (i = 0; i < n; i++) {
GstBuffer *buffer = gst_buffer_list_get (buffer_list, i);
gst_rtsp_message_init_data (&messages[i], channel);
gst_rtsp_message_set_body_buffer (&messages[i], buffer);
}
ret = do_send_data (buffer, channel, client);
if (!ret)
break;
if (priv->send_messages_func) {
ret =
priv->send_messages_func (client, messages, n, FALSE, priv->send_data);
} else if (priv->send_func) {
for (i = 0; i < n; i++) {
ret = priv->send_func (client, &messages[i], FALSE, priv->send_data);
if (!ret)
break;
}
}
g_mutex_unlock (&priv->send_lock);
for (i = 0; i < n; i++) {
gst_rtsp_message_unset (&messages[i]);
}
if (!ret) {
GSource *idle_src;
/* close in watch context */
idle_src = g_idle_source_new ();
g_source_set_callback (idle_src, do_close, client, NULL);
g_source_attach (idle_src, priv->watch_context);
g_source_unref (idle_src);
}
return ret;
@ -1252,6 +1278,7 @@ gst_rtsp_client_close (GstRTSPClient * client)
g_source_destroy ((GSource *) priv->watch);
priv->watch = NULL;
gst_rtsp_client_set_send_func (client, NULL, NULL, NULL);
gst_rtsp_client_set_send_messages_func (client, NULL, NULL, NULL);
rtsp_ctrl_timeout_remove (priv);
g_main_context_unref (priv->watch_context);
priv->watch_context = NULL;
@ -4184,6 +4211,47 @@ gst_rtsp_client_set_send_func (GstRTSPClient * client,
old_notify (old_data);
}
/**
* gst_rtsp_client_set_send_messages_func:
* @client: a #GstRTSPClient
* @func: (scope notified): a #GstRTSPClientSendMessagesFunc
* @user_data: (closure): user data passed to @func
* @notify: (allow-none): called when @user_data is no longer in use
*
* Set @func as the callback that will be called when new messages needs to be
* sent to the client. @user_data is passed to @func and @notify is called when
* @user_data is no longer in use.
*
* By default, the client will send the messages on the #GstRTSPConnection that
* was configured with gst_rtsp_client_attach() was called.
*
* Since: 1.16
*/
void
gst_rtsp_client_set_send_messages_func (GstRTSPClient * client,
GstRTSPClientSendMessagesFunc func, gpointer user_data,
GDestroyNotify notify)
{
GstRTSPClientPrivate *priv;
GDestroyNotify old_notify;
gpointer old_data;
g_return_if_fail (GST_IS_RTSP_CLIENT (client));
priv = client->priv;
g_mutex_lock (&priv->send_lock);
priv->send_messages_func = func;
old_notify = priv->send_messages_notify;
old_data = priv->send_messages_data;
priv->send_messages_notify = notify;
priv->send_messages_data = user_data;
g_mutex_unlock (&priv->send_lock);
if (old_notify)
old_notify (old_data);
}
/**
* gst_rtsp_client_handle_message:
* @client: a #GstRTSPClient
@ -4317,6 +4385,71 @@ error:
}
}
static gboolean
do_send_messages (GstRTSPClient * client, GstRTSPMessage * messages,
guint n_messages, gboolean close, gpointer user_data)
{
GstRTSPClientPrivate *priv = client->priv;
guint id = 0;
GstRTSPResult ret;
guint i;
/* send the message */
ret = gst_rtsp_watch_send_messages (priv->watch, messages, n_messages, &id);
if (ret != GST_RTSP_OK)
goto error;
/* if close flag is set, store the seq number so we can wait until it's
* written to the client to close the connection */
if (close)
priv->close_seq = id;
for (i = 0; i < n_messages; i++) {
if (gst_rtsp_message_get_type (&messages[i]) == GST_RTSP_MESSAGE_DATA) {
guint8 channel = 0;
GstRTSPResult r;
/* We assume that all data messages in the list are for the
* same channel */
r = gst_rtsp_message_parse_data (&messages[i], &channel);
if (r != GST_RTSP_OK) {
ret = r;
goto error;
}
/* check if the message has been queued for transmission in watch */
if (id) {
/* store the seq number so we can wait until it has been sent */
GST_DEBUG_OBJECT (client, "wait for message %d, channel %d", id,
channel);
set_data_seq (client, channel, id);
} else {
GstRTSPStreamTransport *trans;
trans =
g_hash_table_lookup (priv->transports,
GINT_TO_POINTER ((gint) channel));
if (trans) {
GST_DEBUG_OBJECT (client, "emit 'message-sent' signal");
g_mutex_unlock (&priv->send_lock);
gst_rtsp_stream_transport_message_sent (trans);
g_mutex_lock (&priv->send_lock);
}
}
break;
}
}
return ret == GST_RTSP_OK;
/* ERRORS */
error:
{
GST_DEBUG_OBJECT (client, "got error %d", ret);
return FALSE;
}
}
static GstRTSPResult
message_received (GstRTSPWatch * watch, GstRTSPMessage * message,
gpointer user_data)
@ -4378,6 +4511,7 @@ closed (GstRTSPWatch * watch, gpointer user_data)
gst_rtsp_watch_set_flushing (watch, TRUE);
g_mutex_lock (&priv->watch_lock);
gst_rtsp_client_set_send_func (client, NULL, NULL, NULL);
gst_rtsp_client_set_send_messages_func (client, NULL, NULL, NULL);
g_mutex_unlock (&priv->watch_lock);
return GST_RTSP_OK;
@ -4517,6 +4651,7 @@ handle_tunnel (GstRTSPClient * client)
g_source_destroy ((GSource *) priv->watch);
priv->watch = NULL;
gst_rtsp_client_set_send_func (client, NULL, NULL, NULL);
gst_rtsp_client_set_send_messages_func (client, NULL, NULL, NULL);
}
return GST_RTSP_STS_OK;
@ -4655,6 +4790,8 @@ gst_rtsp_client_attach (GstRTSPClient * client, GMainContext * context)
gst_rtsp_client_set_send_func (client, do_send_message,
g_source_ref ((GSource *) priv->watch),
(GDestroyNotify) gst_rtsp_watch_unref);
gst_rtsp_client_set_send_messages_func (client, do_send_messages, priv->watch,
(GDestroyNotify) gst_rtsp_watch_unref);
gst_rtsp_watch_set_send_backlog (priv->watch, 0, WATCH_BACKLOG_SIZE);

View file

@ -61,6 +61,27 @@ typedef gboolean (*GstRTSPClientSendFunc) (GstRTSPClient *client,
gboolean close,
gpointer user_data);
/**
* GstRTSPClientSendMessagesFunc:
* @client: a #GstRTSPClient
* @messages: #GstRTSPMessage
* @n_messages: number of messages
* @close: close the connection
* @user_data: user data when registering the callback
*
* This callback is called when @client wants to send @messages. When @close is
* %TRUE, the connection should be closed when the message has been sent.
*
* Returns: %TRUE on success.
*
* Since: 1.16
*/
typedef gboolean (*GstRTSPClientSendMessagesFunc) (GstRTSPClient *client,
GstRTSPMessage *messages,
guint n_messages,
gboolean close,
gpointer user_data);
/**
* GstRTSPClient:
*
@ -195,6 +216,12 @@ void gst_rtsp_client_set_send_func (GstRTSPClient *client,
gpointer user_data,
GDestroyNotify notify);
GST_RTSP_SERVER_API
void gst_rtsp_client_set_send_messages_func (GstRTSPClient *client,
GstRTSPClientSendMessagesFunc func,
gpointer user_data,
GDestroyNotify notify);
GST_RTSP_SERVER_API
GstRTSPResult gst_rtsp_client_handle_message (GstRTSPClient *client,
GstRTSPMessage *message);

View file

@ -2480,12 +2480,12 @@ send_tcp_message (GstRTSPStream * stream, gint idx)
buffer = gst_sample_get_buffer (sample);
buffer_list = gst_sample_get_buffer_list (sample);
/* We will get one message-sent notification per message,
* i.e. per buffer that is actually sent out */
/* We will get one message-sent notification per buffer or
* complete buffer-list. We handle each buffer-list as a unit */
if (buffer)
n_messages += 1;
if (buffer_list)
n_messages += gst_buffer_list_length (buffer_list);
n_messages += 1;
is_rtp = (idx == 0);