rtsp: Call message_sent() callback for all sent messages.

Previously the messages_sent() callback was only called for messages
which had a CSeq, which excluded all data messages. Instead of using the
CSeq as ID, use a simple index counter.
This commit is contained in:
Peter Kjellerstedt 2009-06-12 15:11:05 +02:00
parent 12134979a2
commit ff38999c8b
2 changed files with 23 additions and 40 deletions

View file

@ -158,8 +158,6 @@ typedef enum
#define TUNNELID_LEN 24 #define TUNNELID_LEN 24
#define UNKNOWN_CSEQ ((guint) -1)
struct _GstRTSPConnection struct _GstRTSPConnection
{ {
/*< private > */ /*< private > */
@ -2635,11 +2633,10 @@ gst_rtsp_connection_do_tunnel (GstRTSPConnection * conn,
typedef struct typedef struct
{ {
GString *str; GString *str;
guint cseq; guint id;
} GstRTSPRec; } GstRTSPRec;
static GstRTSPRec *queue_response (GstRTSPWatch * watch, GString * str, static guint queue_response (GstRTSPWatch * watch, GString * str);
guint cseq);
/* async functions */ /* async functions */
struct _GstRTSPWatch struct _GstRTSPWatch
@ -2656,11 +2653,12 @@ struct _GstRTSPWatch
gboolean write_added; gboolean write_added;
/* queued message for transmission */ /* queued message for transmission */
guint id;
GAsyncQueue *messages; GAsyncQueue *messages;
guint8 *write_data; guint8 *write_data;
guint write_off; guint write_off;
guint write_len; guint write_len;
guint write_cseq; guint write_id;
GstRTSPWatchFuncs funcs; GstRTSPWatchFuncs funcs;
@ -2718,7 +2716,7 @@ gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback G_GNUC_UNUSED,
/* queue the response string */ /* queue the response string */
str = gen_tunnel_reply (watch->conn, code); str = gen_tunnel_reply (watch->conn, code);
queue_response (watch, str, UNKNOWN_CSEQ); queue_response (watch, str);
} else if (res == GST_RTSP_ETPOST) { } else if (res == GST_RTSP_ETPOST) {
/* in the callback the connection should be tunneled with the /* in the callback the connection should be tunneled with the
* GET connection */ * GET connection */
@ -2751,7 +2749,7 @@ gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback G_GNUC_UNUSED,
watch->write_off = 0; watch->write_off = 0;
watch->write_len = data->str->len; watch->write_len = data->str->len;
watch->write_data = (guint8 *) g_string_free (data->str, FALSE); watch->write_data = (guint8 *) g_string_free (data->str, FALSE);
watch->write_cseq = data->cseq; watch->write_id = data->id;
data->str = NULL; data->str = NULL;
g_slice_free (GstRTSPRec, data); g_slice_free (GstRTSPRec, data);
@ -2764,8 +2762,8 @@ gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback G_GNUC_UNUSED,
if (G_UNLIKELY (res != GST_RTSP_OK)) if (G_UNLIKELY (res != GST_RTSP_OK))
goto error; goto error;
if (watch->funcs.message_sent && watch->write_cseq != UNKNOWN_CSEQ) if (watch->funcs.message_sent)
watch->funcs.message_sent (watch, watch->write_cseq, watch->user_data); watch->funcs.message_sent (watch, watch->write_id, watch->user_data);
done: done:
if (g_async_queue_length (watch->messages) == 0 && watch->write_added) { if (g_async_queue_length (watch->messages) == 0 && watch->write_added) {
@ -2951,21 +2949,21 @@ gst_rtsp_watch_unref (GstRTSPWatch * watch)
g_source_unref ((GSource *) watch); g_source_unref ((GSource *) watch);
} }
static GstRTSPRec * static guint
queue_response (GstRTSPWatch * watch, GString * str, guint cseq) queue_response (GstRTSPWatch * watch, GString * str)
{ {
GstRTSPRec *data; GstRTSPRec *data;
/* make a record with the message as a string and cseq */ /* make a record with the message as a string and id */
data = g_slice_new (GstRTSPRec); data = g_slice_new (GstRTSPRec);
data->str = str; data->str = str;
data->cseq = cseq; data->id = ++watch->id;
/* add the record to a queue. FIXME we would like to have an upper limit here */ /* add the record to a queue. FIXME we would like to have an upper limit here */
g_async_queue_push (watch->messages, data); g_async_queue_push (watch->messages, data);
/* FIXME: does the following need to be made thread-safe? (queue_response /* FIXME: does the following need to be made thread-safe? (this might be
* might be called from a streaming thread, like appsink's render function) */ * called from a streaming thread, like appsink's render function) */
/* make sure the main context will now also check for writability on the /* make sure the main context will now also check for writability on the
* socket */ * socket */
if (!watch->write_added) { if (!watch->write_added) {
@ -2973,7 +2971,7 @@ queue_response (GstRTSPWatch * watch, GString * str, guint cseq)
watch->write_added = TRUE; watch->write_added = TRUE;
} }
return data; return data->id;
} }
/** /**
@ -2983,36 +2981,21 @@ queue_response (GstRTSPWatch * watch, GString * str, guint cseq)
* *
* Queue a @message for transmission in @watch. The contents of this * Queue a @message for transmission in @watch. The contents of this
* message will be serialized and transmitted when the connection of the * message will be serialized and transmitted when the connection of the
* watch becomes writable. * @watch becomes writable.
* *
* The return value of this function will be returned as the cseq argument in * The return value of this function will be used as the id argument in the
* the message_sent callback. * message_sent callback.
* *
* Returns: the sequence number of the message or -1 if the cseq could not be * Returns: an id.
* determined.
* *
* Since: 0.10.23 * Since: 0.10.23
*/ */
guint guint
gst_rtsp_watch_queue_message (GstRTSPWatch * watch, GstRTSPMessage * message) gst_rtsp_watch_queue_message (GstRTSPWatch * watch, GstRTSPMessage * message)
{ {
gchar *header;
guint cseq;
g_return_val_if_fail (watch != NULL, GST_RTSP_EINVAL); g_return_val_if_fail (watch != NULL, GST_RTSP_EINVAL);
g_return_val_if_fail (message != NULL, GST_RTSP_EINVAL); g_return_val_if_fail (message != NULL, GST_RTSP_EINVAL);
/* get the cseq from the message, when we finish writing this message on the /* make a record with the message as a string and id */
* socket we will have to pass the cseq to the callback. */ return queue_response (watch, message_to_string (watch->conn, message));
if (gst_rtsp_message_get_header (message, GST_RTSP_HDR_CSEQ, &header,
0) == GST_RTSP_OK) {
cseq = atoi (header);
} else {
cseq = UNKNOWN_CSEQ;
}
/* make a record with the message as a string and cseq */
queue_response (watch, message_to_string (watch->conn, message), cseq);
return cseq;
} }

View file

@ -151,7 +151,7 @@ typedef struct _GstRTSPWatch GstRTSPWatch;
typedef struct { typedef struct {
GstRTSPResult (*message_received) (GstRTSPWatch *watch, GstRTSPMessage *message, GstRTSPResult (*message_received) (GstRTSPWatch *watch, GstRTSPMessage *message,
gpointer user_data); gpointer user_data);
GstRTSPResult (*message_sent) (GstRTSPWatch *watch, guint cseq, GstRTSPResult (*message_sent) (GstRTSPWatch *watch, guint id,
gpointer user_data); gpointer user_data);
GstRTSPResult (*closed) (GstRTSPWatch *watch, gpointer user_data); GstRTSPResult (*closed) (GstRTSPWatch *watch, gpointer user_data);
GstRTSPResult (*error) (GstRTSPWatch *watch, GstRTSPResult result, GstRTSPResult (*error) (GstRTSPWatch *watch, GstRTSPResult result,