rtpbin: more buffering updates

Add signal to pause the jitterbuffer. This will be emitted from gstrtpbin when
one of the jitterbuffers is buffering.
Make rtpbin collect the buffering messages and post a new buffering message with
the min value.
Remove the stats callback from jitterbuffer but pass a percent integer to
functions that affect the buffering state of the jitterbuffer. This allows us
then to post buffering messages from outside of the jitterbuffer lock.
This commit is contained in:
Wim Taymans 2009-10-05 19:45:35 +02:00
parent a5b9d3f917
commit 20a27a545a
7 changed files with 149 additions and 61 deletions

View file

@ -6,3 +6,4 @@ VOID:UINT,OBJECT
VOID:UINT
VOID:UINT,UINT
VOID:OBJECT,OBJECT
VOID:BOOL,UINT64

View file

@ -297,7 +297,7 @@ struct _GstRtpBinStream
gulong buffer_handlesync_sig;
gulong buffer_ptreq_sig;
gulong buffer_ntpstop_sig;
gboolean buffering;
gint percent;
/* the PT demuxer of the SSRC */
GstElement *demux;
@ -1167,6 +1167,7 @@ create_stream (GstRtpBinSession * session, guint32 ssrc)
stream->have_sync = FALSE;
stream->unix_delta = 0;
stream->percent = 100;
session->streams = g_slist_prepend (session->streams, stream);
/* provide clock_rate to the jitterbuffer when needed */
@ -1175,6 +1176,9 @@ create_stream (GstRtpBinSession * session, guint32 ssrc)
stream->buffer_ntpstop_sig = g_signal_connect (buffer, "on-npt-stop",
(GCallback) on_npt_stop, stream);
g_object_set_data (G_OBJECT (buffer), "GstRtpBinSession", session);
g_object_set_data (G_OBJECT (buffer), "GstRtpBinStream", stream);
/* configure latency and packet lost */
g_object_set (buffer, "latency", rtpbin->latency, NULL);
g_object_set (buffer, "do-lost", rtpbin->do_lost, NULL);
@ -1779,9 +1783,80 @@ gst_rtp_bin_handle_message (GstBin * bin, GstMessage * message)
case GST_MESSAGE_BUFFERING:
{
gint percent;
gint min_percent = 100;
GSList *sessions, *streams, *elements = NULL;
GstRtpBinStream *stream;
guint64 base_time = 0;
gboolean change = FALSE, active = FALSE;
gst_message_parse_buffering (message, &percent);
stream =
g_object_get_data (G_OBJECT (GST_MESSAGE_SRC (message)),
"GstRtpBinStream");
GST_DEBUG_OBJECT (bin, "got percent %d from stream %p", percent, stream);
/* get the stream */
if (stream) {
GST_RTP_BIN_LOCK (rtpbin);
/* fill in the percent */
stream->percent = percent;
for (sessions = rtpbin->sessions; sessions;
sessions = g_slist_next (sessions)) {
GstRtpBinSession *session = (GstRtpBinSession *) sessions->data;
GST_RTP_SESSION_LOCK (session);
for (streams = session->streams; streams;
streams = g_slist_next (streams)) {
GstRtpBinStream *stream = (GstRtpBinStream *) streams->data;
GstElement *element = stream->buffer;
GST_DEBUG_OBJECT (bin, "stream %p percent %d", stream,
stream->percent);
/* find min percent */
if (min_percent > stream->percent)
min_percent = stream->percent;
elements = g_slist_prepend (elements, gst_object_ref (element));
}
GST_RTP_SESSION_UNLOCK (session);
}
GST_DEBUG_OBJECT (bin, "min percent %d", min_percent);
if (rtpbin->buffering) {
if (min_percent == 100) {
rtpbin->buffering = FALSE;
active = TRUE;
change = TRUE;
}
} else {
/* pause the streams */
rtpbin->buffering = TRUE;
active = FALSE;
change = TRUE;
}
GST_RTP_BIN_UNLOCK (rtpbin);
gst_message_unref (message);
message =
gst_message_new_buffering (GST_OBJECT_CAST (bin), min_percent);
if (change) {
while (elements) {
GstElement *element = elements->data;
GST_DEBUG_OBJECT (bin, "setting %p to %d", element, active);
g_signal_emit_by_name (element, "set-active", active, base_time,
NULL);
gst_object_unref (element);
elements = g_slist_delete_link (elements, elements);
}
}
}
GST_BIN_CLASS (parent_class)->handle_message (bin, message);
break;
}

View file

@ -49,6 +49,7 @@ struct _GstRtpBin {
gboolean do_lost;
gboolean ignore_pt;
RTPJitterBufferMode buffer_mode;
gboolean buffering;
/* a list of session */
GSList *sessions;

View file

@ -85,6 +85,7 @@ enum
SIGNAL_CLEAR_PT_MAP,
SIGNAL_HANDLE_SYNC,
SIGNAL_ON_NPT_STOP,
SIGNAL_SET_ACTIVE,
LAST_SIGNAL
};
@ -134,6 +135,7 @@ struct _GstRtpJitterBufferPrivate
GCond *jbuf_cond;
gboolean waiting;
gboolean discont;
gboolean active;
/* properties */
guint latency_ms;
@ -266,8 +268,8 @@ static gboolean gst_rtp_jitter_buffer_query (GstPad * pad, GstQuery * query);
static void
gst_rtp_jitter_buffer_clear_pt_map (GstRtpJitterBuffer * jitterbuffer);
static void
do_stats_cb (RTPJitterBuffer * jbuf, guint percent,
GstRtpJitterBuffer * jitterbuffer);
gst_rtp_jitter_buffer_set_active (GstRtpJitterBuffer * jitterbuffer,
gboolean active, guint64 base_time);
static void
gst_rtp_jitter_buffer_base_init (gpointer klass)
@ -403,6 +405,20 @@ gst_rtp_jitter_buffer_class_init (GstRtpJitterBufferClass * klass)
G_STRUCT_OFFSET (GstRtpJitterBufferClass, clear_pt_map), NULL, NULL,
g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
/**
* GstRtpJitterBuffer::set-active:
* @buffer: the object which received the signal
*
* Start pushing out packets with the given base time. This signal is only
* useful in buffering mode.
*/
gst_rtp_jitter_buffer_signals[SIGNAL_SET_ACTIVE] =
g_signal_new ("set-active", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
G_STRUCT_OFFSET (GstRtpJitterBufferClass, set_active), NULL, NULL,
gst_rtp_bin_marshal_VOID__BOOL_UINT64, G_TYPE_NONE, 2, G_TYPE_BOOLEAN,
G_TYPE_UINT64);
gstelement_class->change_state =
GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_change_state);
gstelement_class->request_new_pad =
@ -411,6 +427,7 @@ gst_rtp_jitter_buffer_class_init (GstRtpJitterBufferClass * klass)
GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_release_pad);
klass->clear_pt_map = GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_clear_pt_map);
klass->set_active = GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_set_active);
GST_DEBUG_CATEGORY_INIT
(rtpjitterbuffer_debug, "gstrtpjitterbuffer", 0, "RTP Jitter Buffer");
@ -432,8 +449,6 @@ gst_rtp_jitter_buffer_init (GstRtpJitterBuffer * jitterbuffer,
priv->jbuf = rtp_jitter_buffer_new ();
rtp_jitter_buffer_set_delay (priv->jbuf, priv->latency_ns);
rtp_jitter_buffer_set_stats_cb (priv->jbuf, (RTPBufferingStats) do_stats_cb,
jitterbuffer);
priv->jbuf_lock = g_mutex_new ();
priv->jbuf_cond = g_cond_new ();
@ -631,6 +646,22 @@ gst_rtp_jitter_buffer_clear_pt_map (GstRtpJitterBuffer * jitterbuffer)
JBUF_UNLOCK (priv);
}
static void
gst_rtp_jitter_buffer_set_active (GstRtpJitterBuffer * jbuf, gboolean active,
guint64 base_time)
{
GstRtpJitterBufferPrivate *priv;
priv = jbuf->priv;
JBUF_LOCK (priv);
GST_DEBUG_OBJECT (jbuf, "setting active %d at time %" GST_TIME_FORMAT, active,
GST_TIME_ARGS (base_time));
priv->active = active;
JBUF_SIGNAL (priv);
JBUF_UNLOCK (priv);
}
static GstCaps *
gst_rtp_jitter_buffer_getcaps (GstPad * pad)
{
@ -1119,8 +1150,7 @@ parse_failed:
}
static void
do_stats_cb (RTPJitterBuffer * jbuf, guint percent,
GstRtpJitterBuffer * jitterbuffer)
post_buffering_percent (GstRtpJitterBuffer * jitterbuffer, gint percent)
{
GstMessage *message;
@ -1141,6 +1171,7 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstBuffer * buffer)
GstClockTime timestamp;
guint64 latency_ts;
gboolean tail;
gint percent = -1;
guint8 pt;
jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
@ -1256,7 +1287,7 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstBuffer * buffer)
if (G_UNLIKELY (rtp_jitter_buffer_get_ts_diff (priv->jbuf) >= latency_ts)) {
GstBuffer *old_buf;
old_buf = rtp_jitter_buffer_pop (priv->jbuf);
old_buf = rtp_jitter_buffer_pop (priv->jbuf, &percent);
GST_DEBUG_OBJECT (jitterbuffer, "Queue full, dropping old packet #%d",
gst_rtp_buffer_get_seq (old_buf));
@ -1273,7 +1304,7 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstBuffer * buffer)
* FALSE if a packet with the same seqnum was already in the queue, meaning we
* have a duplicate. */
if (G_UNLIKELY (!rtp_jitter_buffer_insert (priv->jbuf, buffer, timestamp,
priv->clock_rate, priv->latency_ns, &tail)))
priv->clock_rate, &tail, &percent)))
goto duplicate;
/* signal addition of new buffer when the _loop is waiting. */
@ -1295,6 +1326,9 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstBuffer * buffer)
finished:
JBUF_UNLOCK (priv);
if (percent != -1)
post_buffering_percent (jitterbuffer, percent);
gst_object_unref (jitterbuffer);
return ret;
@ -1427,6 +1461,7 @@ gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer)
GstClock *clock;
GstClockID id;
GstClockTime sync_time;
gint percent = -1;
priv = jitterbuffer->priv;
@ -1438,7 +1473,8 @@ again:
/* always wait if we are blocked */
if (G_LIKELY (!priv->blocked)) {
/* we're buffering but not EOS, wait. */
if (!priv->eos && rtp_jitter_buffer_is_buffering (priv->jbuf))
if (!priv->eos && (!priv->active
|| rtp_jitter_buffer_is_buffering (priv->jbuf)))
goto do_wait;
/* if we have a packet, we can exit the loop and grab it */
if (rtp_jitter_buffer_num_packets (priv->jbuf) > 0)
@ -1517,7 +1553,7 @@ again:
if (G_UNLIKELY (gap < 0)) {
GST_DEBUG_OBJECT (jitterbuffer, "Old packet #%d, next #%d dropping",
seqnum, next_seqnum);
outbuf = rtp_jitter_buffer_pop (priv->jbuf);
outbuf = rtp_jitter_buffer_pop (priv->jbuf, &percent);
gst_buffer_unref (outbuf);
goto again;
}
@ -1656,7 +1692,7 @@ again:
push_buffer:
/* when we get here we are ready to pop and push the buffer */
outbuf = rtp_jitter_buffer_pop (priv->jbuf);
outbuf = rtp_jitter_buffer_pop (priv->jbuf, &percent);
if (G_UNLIKELY (discont || priv->discont)) {
/* set DISCONT flag when we missed a packet. We pushed the buffer writable
@ -1718,6 +1754,9 @@ push_buffer:
priv->next_seqnum = (seqnum + 1) & 0xffff;
JBUF_UNLOCK (priv);
if (percent != -1)
post_buffering_percent (jitterbuffer, percent);
/* push buffer */
GST_DEBUG_OBJECT (jitterbuffer,
"Pushing buffer %d, timestamp %" GST_TIME_FORMAT, seqnum,

View file

@ -77,6 +77,8 @@ struct _GstRtpJitterBufferClass
/* actions */
void (*clear_pt_map) (GstRtpJitterBuffer *buffer);
void (*set_active) (GstRtpJitterBuffer *buffer, gboolean active, guint64 base_time);
/*< private > */
gpointer _gst_reserved[GST_PADDING];
};

View file

@ -119,24 +119,6 @@ rtp_jitter_buffer_new (void)
return jbuf;
}
/**
* rtp_jitter_buffer_set_stats_cb:
* @jbuf: an #RTPJitterBuffer
* @stats: the stats callback
* @user_data: user data passed to the callback
*
* Install a callbacl that will be called when the buffering state of @jbuf
* changed.
*/
void
rtp_jitter_buffer_set_stats_cb (RTPJitterBuffer * jbuf,
RTPBufferingStats stats_cb, gpointer user_data)
{
jbuf->stats_cb = stats_cb;
jbuf->stats_data = user_data;
}
/**
* rtp_jitter_buffer_get_mode:
* @jbuf: an #RTPJitterBuffer
@ -225,7 +207,7 @@ rtp_jitter_buffer_resync (RTPJitterBuffer * jbuf, GstClockTime time,
}
static void
update_buffer_level (RTPJitterBuffer * jbuf)
update_buffer_level (RTPJitterBuffer * jbuf, gint * percent)
{
GstBuffer *high_buf, *low_buf;
gboolean post = FALSE;
@ -262,19 +244,19 @@ update_buffer_level (RTPJitterBuffer * jbuf)
}
}
if (post) {
gint percent;
gint perc;
if (jbuf->buffering) {
percent = (jbuf->level * 100 / jbuf->delay);
percent = MIN (percent, 100);
perc = (jbuf->level * 100 / jbuf->delay);
perc = MIN (perc, 100);
} else {
percent = 100;
perc = 100;
}
if (jbuf->stats_cb)
jbuf->stats_cb (jbuf, percent, jbuf->stats_data);
if (percent)
*percent = perc;
GST_DEBUG ("buffering %d", percent);
GST_DEBUG ("buffering %d", perc);
}
}
@ -575,8 +557,7 @@ no_skew:
*/
gboolean
rtp_jitter_buffer_insert (RTPJitterBuffer * jbuf, GstBuffer * buf,
GstClockTime time, guint32 clock_rate, GstClockTime max_delay,
gboolean * tail)
GstClockTime time, guint32 clock_rate, gboolean * tail, gint * percent)
{
GList *list;
guint32 rtptime;
@ -642,7 +623,9 @@ rtp_jitter_buffer_insert (RTPJitterBuffer * jbuf, GstBuffer * buf,
/* buffering mode, update buffer stats */
if (jbuf->mode == RTP_JITTER_BUFFER_MODE_BUFFER)
update_buffer_level (jbuf);
update_buffer_level (jbuf, percent);
else
*percent = -1;
/* tail was changed when we did not find a previous packet, we set the return
* flag when requested. */
@ -662,6 +645,7 @@ duplicate:
/**
* rtp_jitter_buffer_pop:
* @jbuf: an #RTPJitterBuffer
* @percent: the buffering percent
*
* Pops the oldest buffer from the packet queue of @jbuf. The popped buffer will
* have its timestamp adjusted with the incomming running_time and the detected
@ -670,7 +654,7 @@ duplicate:
* Returns: a #GstBuffer or %NULL when there was no packet in the queue.
*/
GstBuffer *
rtp_jitter_buffer_pop (RTPJitterBuffer * jbuf)
rtp_jitter_buffer_pop (RTPJitterBuffer * jbuf, gint * percent)
{
GstBuffer *buf;
@ -680,7 +664,9 @@ rtp_jitter_buffer_pop (RTPJitterBuffer * jbuf)
/* buffering mode, update buffer stats */
if (jbuf->mode == RTP_JITTER_BUFFER_MODE_BUFFER)
update_buffer_level (jbuf);
update_buffer_level (jbuf, percent);
else
*percent = -1;
return buf;
}

View file

@ -58,16 +58,6 @@ typedef enum {
#define RTP_TYPE_JITTER_BUFFER_MODE (rtp_jitter_buffer_mode_get_type())
GType rtp_jitter_buffer_mode_get_type (void);
/**
* RTPBufferingStats:
* @jbuf: an #RTPJitterBuffer
* @percent: the buffering percent
* @user_data: user data specified when registering
*
* Called when buffering is going on in @jbuf.
*/
typedef void (*RTPBufferingStats) (RTPJitterBuffer *jbuf, guint percent, gpointer user_data);
#define RTP_JITTER_BUFFER_MAX_WINDOW 512
/**
* RTPJitterBuffer:
@ -88,8 +78,6 @@ struct _RTPJitterBuffer {
guint64 level;
guint64 low_level;
guint64 high_level;
RTPBufferingStats stats_cb;
gpointer stats_data;
/* for calculating skew */
GstClockTime base_time;
@ -117,9 +105,6 @@ GType rtp_jitter_buffer_get_type (void);
/* managing lifetime */
RTPJitterBuffer* rtp_jitter_buffer_new (void);
void rtp_jitter_buffer_set_stats_cb (RTPJitterBuffer *jbuf, RTPBufferingStats stats_cb,
gpointer user_data);
RTPJitterBufferMode rtp_jitter_buffer_get_mode (RTPJitterBuffer *jbuf);
void rtp_jitter_buffer_set_mode (RTPJitterBuffer *jbuf, RTPJitterBufferMode mode);
@ -131,10 +116,9 @@ void rtp_jitter_buffer_reset_skew (RTPJitterBuffer *jbuf)
gboolean rtp_jitter_buffer_insert (RTPJitterBuffer *jbuf, GstBuffer *buf,
GstClockTime time,
guint32 clock_rate,
GstClockTime max_delay,
gboolean *tail);
gboolean *tail, gint *percent);
GstBuffer * rtp_jitter_buffer_peek (RTPJitterBuffer *jbuf);
GstBuffer * rtp_jitter_buffer_pop (RTPJitterBuffer *jbuf);
GstBuffer * rtp_jitter_buffer_pop (RTPJitterBuffer *jbuf, gint *percent);
void rtp_jitter_buffer_flush (RTPJitterBuffer *jbuf);