diff --git a/docs/libs/gst-rtsp-server-sections.txt b/docs/libs/gst-rtsp-server-sections.txt index 369a57af8d..c1627da451 100644 --- a/docs/libs/gst-rtsp-server-sections.txt +++ b/docs/libs/gst-rtsp-server-sections.txt @@ -118,6 +118,8 @@ gst_rtsp_client_attach GstRTSPClientSendFunc gst_rtsp_client_set_send_func +GstRTSPClientSendMessagesFunc +gst_rtsp_client_set_send_messages_func gst_rtsp_client_handle_message gst_rtsp_client_send_message diff --git a/gst/rtsp-server/rtsp-client.c b/gst/rtsp-server/rtsp-client.c index cce9222869..020f2fab2c 100644 --- a/gst/rtsp-server/rtsp-client.c +++ b/gst/rtsp-server/rtsp-client.c @@ -80,6 +80,9 @@ struct _GstRTSPClientPrivate GstRTSPClientSendFunc send_func; gpointer send_data; GDestroyNotify send_notify; + GstRTSPClientSendMessagesFunc send_messages_func; + gpointer send_messages_data; + GDestroyNotify send_messages_notify; guint close_seq; GArray *data_seqs; @@ -753,6 +756,7 @@ gst_rtsp_client_finalize (GObject * obj) if (priv->watch) gst_rtsp_watch_set_flushing (priv->watch, TRUE); gst_rtsp_client_set_send_func (client, NULL, NULL, NULL); + gst_rtsp_client_set_send_messages_func (client, NULL, NULL, NULL); if (priv->watch) g_source_destroy ((GSource *) priv->watch); @@ -1158,17 +1162,10 @@ do_send_data (GstBuffer * buffer, guint8 channel, GstRTSPClient * client) GstRTSPClientPrivate *priv = client->priv; GstRTSPMessage message = { 0 }; gboolean ret = TRUE; - GstMapInfo map_info; - guint8 *data; - guint usize; gst_rtsp_message_init_data (&message, channel); - /* FIXME, need some sort of iovec RTSPMessage here */ - if (!gst_buffer_map (buffer, &map_info, GST_MAP_READ)) - return FALSE; - - gst_rtsp_message_take_body (&message, map_info.data, map_info.size); + gst_rtsp_message_set_body_buffer (&message, buffer); g_mutex_lock (&priv->send_lock); if (get_data_seq (client, channel) != 0) { @@ -1180,9 +1177,6 @@ do_send_data (GstBuffer * buffer, guint8 channel, GstRTSPClient * client) ret = priv->send_func (client, &message, FALSE, priv->send_data); g_mutex_unlock (&priv->send_lock); - gst_rtsp_message_steal_body (&message, &data, &usize); - gst_buffer_unmap (buffer, &map_info); - gst_rtsp_message_unset (&message); if (!ret) { @@ -1202,18 +1196,50 @@ static gboolean do_send_data_list (GstBufferList * buffer_list, guint8 channel, GstRTSPClient * client) { + GstRTSPClientPrivate *priv = client->priv; gboolean ret = TRUE; guint i, n = gst_buffer_list_length (buffer_list); + GstRTSPMessage *messages; - /* TODO: Needs support for a) queueing up multiple messages on the - * GstRTSPWatch in do_send_data() above and b) for one message having a body - * consisting of multiple parts here */ + g_mutex_lock (&priv->send_lock); + if (get_data_seq (client, channel) != 0) { + GST_WARNING ("already a queued data message for channel %d", channel); + g_mutex_unlock (&priv->send_lock); + return FALSE; + } + + messages = g_newa (GstRTSPMessage, n); + memset (messages, 0, sizeof (GstRTSPMessage) * n); for (i = 0; i < n; i++) { GstBuffer *buffer = gst_buffer_list_get (buffer_list, i); + gst_rtsp_message_init_data (&messages[i], channel); + gst_rtsp_message_set_body_buffer (&messages[i], buffer); + } - ret = do_send_data (buffer, channel, client); - if (!ret) - break; + if (priv->send_messages_func) { + ret = + priv->send_messages_func (client, messages, n, FALSE, priv->send_data); + } else if (priv->send_func) { + for (i = 0; i < n; i++) { + ret = priv->send_func (client, &messages[i], FALSE, priv->send_data); + if (!ret) + break; + } + } + g_mutex_unlock (&priv->send_lock); + + for (i = 0; i < n; i++) { + gst_rtsp_message_unset (&messages[i]); + } + + if (!ret) { + GSource *idle_src; + + /* close in watch context */ + idle_src = g_idle_source_new (); + g_source_set_callback (idle_src, do_close, client, NULL); + g_source_attach (idle_src, priv->watch_context); + g_source_unref (idle_src); } return ret; @@ -1252,6 +1278,7 @@ gst_rtsp_client_close (GstRTSPClient * client) g_source_destroy ((GSource *) priv->watch); priv->watch = NULL; gst_rtsp_client_set_send_func (client, NULL, NULL, NULL); + gst_rtsp_client_set_send_messages_func (client, NULL, NULL, NULL); rtsp_ctrl_timeout_remove (priv); g_main_context_unref (priv->watch_context); priv->watch_context = NULL; @@ -4184,6 +4211,47 @@ gst_rtsp_client_set_send_func (GstRTSPClient * client, old_notify (old_data); } +/** + * gst_rtsp_client_set_send_messages_func: + * @client: a #GstRTSPClient + * @func: (scope notified): a #GstRTSPClientSendMessagesFunc + * @user_data: (closure): user data passed to @func + * @notify: (allow-none): called when @user_data is no longer in use + * + * Set @func as the callback that will be called when new messages needs to be + * sent to the client. @user_data is passed to @func and @notify is called when + * @user_data is no longer in use. + * + * By default, the client will send the messages on the #GstRTSPConnection that + * was configured with gst_rtsp_client_attach() was called. + * + * Since: 1.16 + */ +void +gst_rtsp_client_set_send_messages_func (GstRTSPClient * client, + GstRTSPClientSendMessagesFunc func, gpointer user_data, + GDestroyNotify notify) +{ + GstRTSPClientPrivate *priv; + GDestroyNotify old_notify; + gpointer old_data; + + g_return_if_fail (GST_IS_RTSP_CLIENT (client)); + + priv = client->priv; + + g_mutex_lock (&priv->send_lock); + priv->send_messages_func = func; + old_notify = priv->send_messages_notify; + old_data = priv->send_messages_data; + priv->send_messages_notify = notify; + priv->send_messages_data = user_data; + g_mutex_unlock (&priv->send_lock); + + if (old_notify) + old_notify (old_data); +} + /** * gst_rtsp_client_handle_message: * @client: a #GstRTSPClient @@ -4317,6 +4385,71 @@ error: } } +static gboolean +do_send_messages (GstRTSPClient * client, GstRTSPMessage * messages, + guint n_messages, gboolean close, gpointer user_data) +{ + GstRTSPClientPrivate *priv = client->priv; + guint id = 0; + GstRTSPResult ret; + guint i; + + /* send the message */ + ret = gst_rtsp_watch_send_messages (priv->watch, messages, n_messages, &id); + if (ret != GST_RTSP_OK) + goto error; + + /* if close flag is set, store the seq number so we can wait until it's + * written to the client to close the connection */ + if (close) + priv->close_seq = id; + + for (i = 0; i < n_messages; i++) { + if (gst_rtsp_message_get_type (&messages[i]) == GST_RTSP_MESSAGE_DATA) { + guint8 channel = 0; + GstRTSPResult r; + + /* We assume that all data messages in the list are for the + * same channel */ + r = gst_rtsp_message_parse_data (&messages[i], &channel); + if (r != GST_RTSP_OK) { + ret = r; + goto error; + } + + /* check if the message has been queued for transmission in watch */ + if (id) { + /* store the seq number so we can wait until it has been sent */ + GST_DEBUG_OBJECT (client, "wait for message %d, channel %d", id, + channel); + set_data_seq (client, channel, id); + } else { + GstRTSPStreamTransport *trans; + + trans = + g_hash_table_lookup (priv->transports, + GINT_TO_POINTER ((gint) channel)); + if (trans) { + GST_DEBUG_OBJECT (client, "emit 'message-sent' signal"); + g_mutex_unlock (&priv->send_lock); + gst_rtsp_stream_transport_message_sent (trans); + g_mutex_lock (&priv->send_lock); + } + } + break; + } + } + + return ret == GST_RTSP_OK; + + /* ERRORS */ +error: + { + GST_DEBUG_OBJECT (client, "got error %d", ret); + return FALSE; + } +} + static GstRTSPResult message_received (GstRTSPWatch * watch, GstRTSPMessage * message, gpointer user_data) @@ -4378,6 +4511,7 @@ closed (GstRTSPWatch * watch, gpointer user_data) gst_rtsp_watch_set_flushing (watch, TRUE); g_mutex_lock (&priv->watch_lock); gst_rtsp_client_set_send_func (client, NULL, NULL, NULL); + gst_rtsp_client_set_send_messages_func (client, NULL, NULL, NULL); g_mutex_unlock (&priv->watch_lock); return GST_RTSP_OK; @@ -4517,6 +4651,7 @@ handle_tunnel (GstRTSPClient * client) g_source_destroy ((GSource *) priv->watch); priv->watch = NULL; gst_rtsp_client_set_send_func (client, NULL, NULL, NULL); + gst_rtsp_client_set_send_messages_func (client, NULL, NULL, NULL); } return GST_RTSP_STS_OK; @@ -4655,6 +4790,8 @@ gst_rtsp_client_attach (GstRTSPClient * client, GMainContext * context) gst_rtsp_client_set_send_func (client, do_send_message, g_source_ref ((GSource *) priv->watch), (GDestroyNotify) gst_rtsp_watch_unref); + gst_rtsp_client_set_send_messages_func (client, do_send_messages, priv->watch, + (GDestroyNotify) gst_rtsp_watch_unref); gst_rtsp_watch_set_send_backlog (priv->watch, 0, WATCH_BACKLOG_SIZE); diff --git a/gst/rtsp-server/rtsp-client.h b/gst/rtsp-server/rtsp-client.h index 01846bc249..ebf6aeca27 100644 --- a/gst/rtsp-server/rtsp-client.h +++ b/gst/rtsp-server/rtsp-client.h @@ -61,6 +61,27 @@ typedef gboolean (*GstRTSPClientSendFunc) (GstRTSPClient *client, gboolean close, gpointer user_data); +/** + * GstRTSPClientSendMessagesFunc: + * @client: a #GstRTSPClient + * @messages: #GstRTSPMessage + * @n_messages: number of messages + * @close: close the connection + * @user_data: user data when registering the callback + * + * This callback is called when @client wants to send @messages. When @close is + * %TRUE, the connection should be closed when the message has been sent. + * + * Returns: %TRUE on success. + * + * Since: 1.16 + */ +typedef gboolean (*GstRTSPClientSendMessagesFunc) (GstRTSPClient *client, + GstRTSPMessage *messages, + guint n_messages, + gboolean close, + gpointer user_data); + /** * GstRTSPClient: * @@ -195,6 +216,12 @@ void gst_rtsp_client_set_send_func (GstRTSPClient *client, gpointer user_data, GDestroyNotify notify); +GST_RTSP_SERVER_API +void gst_rtsp_client_set_send_messages_func (GstRTSPClient *client, + GstRTSPClientSendMessagesFunc func, + gpointer user_data, + GDestroyNotify notify); + GST_RTSP_SERVER_API GstRTSPResult gst_rtsp_client_handle_message (GstRTSPClient *client, GstRTSPMessage *message); diff --git a/gst/rtsp-server/rtsp-stream.c b/gst/rtsp-server/rtsp-stream.c index b9d2e7f286..11612578b4 100644 --- a/gst/rtsp-server/rtsp-stream.c +++ b/gst/rtsp-server/rtsp-stream.c @@ -2480,12 +2480,12 @@ send_tcp_message (GstRTSPStream * stream, gint idx) buffer = gst_sample_get_buffer (sample); buffer_list = gst_sample_get_buffer_list (sample); - /* We will get one message-sent notification per message, - * i.e. per buffer that is actually sent out */ + /* We will get one message-sent notification per buffer or + * complete buffer-list. We handle each buffer-list as a unit */ if (buffer) n_messages += 1; if (buffer_list) - n_messages += gst_buffer_list_length (buffer_list); + n_messages += 1; is_rtp = (idx == 0);