rtsp: Added new API for sending using GstRTSPWatch.

The new API to send messages using GstRTSPWatch will first try to send the
message immediately. Then, if that failed (or the message was not sent
fully), it will queue the remaining message for later delivery. This avoids
unnecessary context switches, and makes it possible to keep track of
whether the connection is blocked (the unblocking of the connection is
indicated by the reception of the message_sent signal).

This also deprecates the old API (gst_rtsp_watch_queue_data() and
gst_rtsp_watch_queue_message().)

API: gst_rtsp_watch_write_data()
API: gst_rtsp_watch_send_message()
This commit is contained in:
Peter Kjellerstedt 2009-08-17 11:53:43 +02:00
parent 0af04aa4a8
commit 066f9be5c9
2 changed files with 131 additions and 1 deletions

View file

@ -3035,7 +3035,7 @@ gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback G_GNUC_UNUSED,
/* queue the response */ /* queue the response */
response = gen_tunnel_reply (watch->conn, code, &watch->message); response = gen_tunnel_reply (watch->conn, code, &watch->message);
gst_rtsp_watch_queue_message (watch, response); gst_rtsp_watch_send_message (watch, response, NULL);
gst_rtsp_message_free (response); gst_rtsp_message_free (response);
goto read_done; goto read_done;
} else if (watch->conn->tstate == TUNNEL_STATE_NONE && } else if (watch->conn->tstate == TUNNEL_STATE_NONE &&
@ -3314,6 +3314,119 @@ gst_rtsp_watch_unref (GstRTSPWatch * watch)
g_source_unref ((GSource *) watch); g_source_unref ((GSource *) watch);
} }
/**
* gst_rtsp_watch_write_data:
* @watch: a #GstRTSPWatch
* @data: the data to queue
* @size: the size of @data
* @id: location for a message ID or %NULL
*
* Write @data using the connection of the @watch. If it cannot be sent
* immediately, it will be queued for transmission in @watch. The contents of
* @message will then be serialized and transmitted when the connection of the
* @watch becomes writable. In case the @message is queued, the ID returned in
* @id will be non-zero and used as the ID argument in the message_sent
* callback.
*
* This function will take ownership of @data and g_free() it after use.
*
* Returns: #GST_RTSP_OK on success.
*
* Since: 0.10.25
*/
GstRTSPResult
gst_rtsp_watch_write_data (GstRTSPWatch * watch, const guint8 * data,
guint size, guint * id)
{
GstRTSPResult res;
GstRTSPRec *rec;
guint off = 0;
g_return_val_if_fail (watch != NULL, GST_RTSP_EINVAL);
g_return_val_if_fail (data != NULL, GST_RTSP_EINVAL);
g_return_val_if_fail (size != 0, GST_RTSP_EINVAL);
g_mutex_lock (watch->mutex);
if (watch->messages->length == 0) {
res = write_bytes (watch->writefd.fd, data, &off, size);
if (res != GST_RTSP_EINTR) {
if (id != NULL)
*id = 0;
g_free ((gpointer) data);
goto done;
}
}
/* make a record with the data and id */
rec = g_slice_new (GstRTSPRec);
if (off == 0) {
rec->data = (guint8 *) data;
rec->size = size;
} else {
rec->data = g_memdup (data + off, size - off);
rec->size = size - off;
g_free ((gpointer) data);
}
do {
/* make sure rec->id is never 0 */
rec->id = ++watch->id;
} while (G_UNLIKELY (rec->id == 0));
/* add the record to a queue. FIXME we would like to have an upper limit here */
g_queue_push_head (watch->messages, rec);
/* make sure the main context will now also check for writability on the
* socket */
if (!watch->write_added) {
g_source_add_poll ((GSource *) watch, &watch->writefd);
watch->write_added = TRUE;
}
if (id != NULL)
*id = rec->id;
res = GST_RTSP_OK;
done:
g_mutex_unlock (watch->mutex);
return res;
}
/**
* gst_rtsp_watch_send_message:
* @watch: a #GstRTSPWatch
* @message: a #GstRTSPMessage
* @id: location for a message ID or %NULL
*
* Send a @message using the connection of the @watch. If it cannot be sent
* immediately, it will be queued for transmission in @watch. The contents of
* @message will then be serialized and transmitted when the connection of the
* @watch becomes writable. In case the @message is queued, the ID returned in
* @id will be non-zero and used as the ID argument in the message_sent
* callback.
*
* Returns: #GST_RTSP_OK on success.
*
* Since: 0.10.25
*/
GstRTSPResult
gst_rtsp_watch_send_message (GstRTSPWatch * watch, GstRTSPMessage * message,
guint * id)
{
GString *str;
guint size;
g_return_val_if_fail (watch != NULL, GST_RTSP_EINVAL);
g_return_val_if_fail (message != NULL, GST_RTSP_EINVAL);
/* make a record with the message as a string and id */
str = message_to_string (watch->conn, message);
size = str->len;
return gst_rtsp_watch_write_data (watch,
(guint8 *) g_string_free (str, FALSE), size, id);
}
/** /**
* gst_rtsp_watch_queue_data: * gst_rtsp_watch_queue_data:
* @watch: a #GstRTSPWatch * @watch: a #GstRTSPWatch
@ -3328,10 +3441,13 @@ gst_rtsp_watch_unref (GstRTSPWatch * watch)
* The return value of this function will be used as the id argument in the * The return value of this function will be used as the id argument in the
* message_sent callback. * message_sent callback.
* *
* Deprecated: Use gst_rtsp_watch_write_data()
*
* Returns: an id. * Returns: an id.
* *
* Since: 0.10.24 * Since: 0.10.24
*/ */
#ifndef GST_REMOVE_DEPRECATED
guint guint
gst_rtsp_watch_queue_data (GstRTSPWatch * watch, const guint8 * data, gst_rtsp_watch_queue_data (GstRTSPWatch * watch, const guint8 * data,
guint size) guint size)
@ -3366,6 +3482,7 @@ gst_rtsp_watch_queue_data (GstRTSPWatch * watch, const guint8 * data,
g_mutex_unlock (watch->mutex); g_mutex_unlock (watch->mutex);
return rec->id; return rec->id;
} }
#endif /* GST_REMOVE_DEPRECATED */
/** /**
* gst_rtsp_watch_queue_message: * gst_rtsp_watch_queue_message:
@ -3379,10 +3496,13 @@ gst_rtsp_watch_queue_data (GstRTSPWatch * watch, const guint8 * data,
* The return value of this function will be used as the id argument in the * The return value of this function will be used as the id argument in the
* message_sent callback. * message_sent callback.
* *
* Deprecated: Use gst_rtsp_watch_send_message()
*
* Returns: an id. * Returns: an id.
* *
* Since: 0.10.23 * Since: 0.10.23
*/ */
#ifndef GST_REMOVE_DEPRECATED
guint guint
gst_rtsp_watch_queue_message (GstRTSPWatch * watch, GstRTSPMessage * message) gst_rtsp_watch_queue_message (GstRTSPWatch * watch, GstRTSPMessage * message)
{ {
@ -3398,3 +3518,4 @@ gst_rtsp_watch_queue_message (GstRTSPWatch * watch, GstRTSPMessage * message)
return gst_rtsp_watch_queue_data (watch, return gst_rtsp_watch_queue_data (watch,
(guint8 *) g_string_free (str, FALSE), size); (guint8 *) g_string_free (str, FALSE), size);
} }
#endif /* GST_REMOVE_DEPRECATED */

View file

@ -186,11 +186,20 @@ void gst_rtsp_watch_unref (GstRTSPWatch *watch);
guint gst_rtsp_watch_attach (GstRTSPWatch *watch, guint gst_rtsp_watch_attach (GstRTSPWatch *watch,
GMainContext *context); GMainContext *context);
GstRTSPResult gst_rtsp_watch_write_data (GstRTSPWatch *watch,
const guint8 *data,
guint size, guint *id);
GstRTSPResult gst_rtsp_watch_send_message (GstRTSPWatch *watch,
GstRTSPMessage *message,
guint *id);
#ifndef GST_DISABLE_DEPRECATED
guint gst_rtsp_watch_queue_data (GstRTSPWatch * watch, guint gst_rtsp_watch_queue_data (GstRTSPWatch * watch,
const guint8 * data, const guint8 * data,
guint size); guint size);
guint gst_rtsp_watch_queue_message (GstRTSPWatch *watch, guint gst_rtsp_watch_queue_message (GstRTSPWatch *watch,
GstRTSPMessage *message); GstRTSPMessage *message);
#endif
G_END_DECLS G_END_DECLS