gst-libs/gst/rtp/gstbasertpdepayload.*: Made a thread to release the queue.

Original commit message from CVS:
2005-08-12  Philippe Khalaf <burger@speedy.org>
* gst-libs/gst/rtp/gstbasertpdepayload.c:
* gst-libs/gst/rtp/gstbasertpdepayload.h:
Made a thread to release the queue.
Removed timestamp conversion for now.
This commit is contained in:
Philippe Kalaf 2005-08-12 13:34:56 +00:00
parent b50d3fe5f6
commit 96a0b1b9b9
3 changed files with 151 additions and 26 deletions

View file

@ -1,3 +1,9 @@
2005-08-12 Philippe Khalaf <burger@speedy.org>
* gst-libs/gst/rtp/gstbasertpdepayload.c:
* gst-libs/gst/rtp/gstbasertpdepayload.h:
Made a thread to release the queue.
Removed timestamp conversion for now.
2005-08-10 Philippe Khalaf <burger@speedy.org>
* gst-libs/gst/rtp/gstbasertpdepayload.c:
* gst-libs/gst/rtp/gstbasertpdepayload.h:

View file

@ -88,8 +88,10 @@ static gboolean gst_base_rtp_depayload_setcaps (GstPad * pad, GstCaps * caps);
static GstFlowReturn gst_base_rtp_depayload_chain (GstPad * pad,
GstBuffer * in);
static GstFlowReturn gst_base_rtp_depayload_add_to_queue
(GstBaseRTPDepayload * filter, GstRTPBuffer * in);
static GstElementStateReturn gst_base_rtp_depayload_change_state (GstElement *
element);
static GstFlowReturn gst_base_rtp_depayload_add_to_queue (GstBaseRTPDepayload *
filter, GstRTPBuffer * in);
static void gst_base_rtp_depayload_set_gst_timestamp
(GstBaseRTPDepayload * filter, guint32 timestamp, GstBuffer * buf);
@ -108,8 +110,10 @@ static void
gst_base_rtp_depayload_class_init (GstBaseRTPDepayloadClass * klass)
{
GObjectClass *gobject_class;
GstElementClass *gstelement_class;
gobject_class = G_OBJECT_CLASS (klass);
gstelement_class = (GstElementClass *) klass;
gobject_class->set_property = gst_base_rtp_depayload_set_property;
gobject_class->get_property = gst_base_rtp_depayload_get_property;
@ -124,6 +128,8 @@ gst_base_rtp_depayload_class_init (GstBaseRTPDepayloadClass * klass)
gobject_class->finalize = gst_base_rtp_depayload_finalize;
gstelement_class->change_state = gst_base_rtp_depayload_change_state;
klass->add_to_queue = gst_base_rtp_depayload_add_to_queue;
klass->set_gst_timestamp = gst_base_rtp_depayload_set_gst_timestamp;
@ -158,6 +164,9 @@ gst_base_rtp_depayload_init (GstBaseRTPDepayload * filter, gpointer g_class)
filter->queue_delay = RTP_QUEUEDELAY;
// init queue mutex
QUEUE_LOCK_INIT (filter);
// this one needs to be overwritten by child
filter->clock_rate = 0;
}
@ -234,6 +243,7 @@ gst_base_rtp_depayload_add_to_queue (GstBaseRTPDepayload * filter,
GQueue *queue = filter->queue;
// our first packet, just push it
QUEUE_LOCK (filter);
if (g_queue_is_empty (queue)) {
g_queue_push_tail (queue, in);
} else
@ -255,28 +265,8 @@ gst_base_rtp_depayload_add_to_queue (GstBaseRTPDepayload * filter,
GST_DEBUG ("Packet added to queue %d at pos %d timestamp %u sn %d",
g_queue_get_length (queue), i, in->timestamp, in->seqnum);
// if our queue is getting to big (more than RTP_QUEUEDELAY ms of data)
// release heading buffers
//GST_DEBUG("clockrate %d, queu_delay %d", filter->clock_rate, filter->queue_delay);
gfloat q_size_secs = (gfloat) filter->queue_delay / 1000;
guint maxtsunits = (gfloat) filter->clock_rate * q_size_secs;
GST_DEBUG ("maxtsunit is %u", maxtsunits);
GST_DEBUG ("ts %d %d %d %d", in->timestamp, in->seqnum,
GST_RTPBUFFER (g_queue_peek_tail (queue))->timestamp,
GST_RTPBUFFER (g_queue_peek_tail (queue))->seqnum);
while (in->timestamp -
GST_RTPBUFFER (g_queue_peek_tail (queue))->timestamp > maxtsunits) {
GST_DEBUG ("Poping packet from queue");
GstBaseRTPDepayloadClass *bclass =
GST_BASE_RTP_DEPAYLOAD_GET_CLASS (filter);
if (bclass->process) {
GstRTPBuffer *in = g_queue_pop_tail (queue);
gst_base_rtp_depayload_push (filter, GST_RTPBUFFER (in));
}
}
}
QUEUE_UNLOCK (filter);
return GST_FLOW_OK;
}
@ -298,7 +288,7 @@ gst_base_rtp_depayload_push (GstBaseRTPDepayload * filter,
// is the same as the timestamp wanted on the collector
// maybe i should add a way to override this timestamp from the
// depayloader child class
bclass->set_gst_timestamp (filter, rtp_buf->timestamp, out_buf);
//bclass->set_gst_timestamp (filter, rtp_buf->timestamp, out_buf);
// push it
GST_DEBUG ("Pushing buffer size %d, timestamp %u",
GST_BUFFER_SIZE (out_buf), GST_BUFFER_TIMESTAMP (out_buf));
@ -315,8 +305,7 @@ gst_base_rtp_depayload_set_gst_timestamp (GstBaseRTPDepayload * filter,
// rtp timestamps are based on the clock_rate
// gst timesamps are in nanoseconds
GST_DEBUG ("calculating ts : timestamp : %u, clockrate : %u", timestamp,
filter->clock_rate);
//GST_DEBUG("calculating ts : timestamp : %u, clockrate : %u", timestamp, filter->clock_rate);
guint64 ts = ((timestamp * GST_SECOND) / filter->clock_rate);
GST_BUFFER_TIMESTAMP (buf) = ts;
@ -337,6 +326,124 @@ gst_base_rtp_depayload_set_gst_timestamp (GstBaseRTPDepayload * filter,
}
}
static void
gst_base_rtp_depayload_queue_release (GstBaseRTPDepayload * filter)
{
GQueue *queue = filter->queue;
if (g_queue_is_empty (queue))
return;
// if our queue is getting to big (more than RTP_QUEUEDELAY ms of data)
// release heading buffers
//GST_DEBUG("clockrate %d, queu_delay %d", filter->clock_rate, filter->queue_delay);
gfloat q_size_secs = (gfloat) filter->queue_delay / 1000;
guint maxtsunits = (gfloat) filter->clock_rate * q_size_secs;
//GST_DEBUG("maxtsunit is %u", maxtsunits);
//GST_DEBUG("ts %d %d %d", GST_RTPBUFFER(g_queue_peek_head (queue))->timestamp, GST_RTPBUFFER(g_queue_peek_tail (queue))->timestamp);
QUEUE_LOCK (filter);
while (GST_RTPBUFFER (g_queue_peek_head (queue))->timestamp -
GST_RTPBUFFER (g_queue_peek_tail (queue))->timestamp > maxtsunits) {
//GST_DEBUG("Poping packet from queue");
GstBaseRTPDepayloadClass *bclass =
GST_BASE_RTP_DEPAYLOAD_GET_CLASS (filter);
if (bclass->process) {
GstRTPBuffer *in = g_queue_pop_tail (queue);
gst_base_rtp_depayload_push (filter, GST_RTPBUFFER (in));
}
}
QUEUE_UNLOCK (filter);
}
static gpointer
gst_base_rtp_depayload_thread (GstBaseRTPDepayload * filter)
{
while (filter->thread_running) {
gst_base_rtp_depayload_queue_release (filter);
// i want to run this thread clock_rate times per second
g_usleep (1000000 / filter->clock_rate);
//g_usleep (1000000);
}
return NULL;
}
static gboolean
gst_base_rtp_depayload_start_thread (GstBaseRTPDepayload * filter)
{
GST_DEBUG ("Starting queue release thread");
filter->thread_running = TRUE;
filter->thread = g_thread_create ((GThreadFunc) gst_base_rtp_depayload_thread,
filter, TRUE, NULL);
GST_DEBUG ("Started queue release thread");
return TRUE;
}
static gboolean
gst_base_rtp_depayload_stop_thread (GstBaseRTPDepayload * filter)
{
filter->thread_running = FALSE;
if (filter->thread) {
g_thread_join (filter->thread);
filter->thread = NULL;
}
QUEUE_LOCK_FREE (filter);
return TRUE;
}
static GstElementStateReturn
gst_base_rtp_depayload_change_state (GstElement * element)
{
GstBaseRTPDepayload *filter;
gint transition;
// GstElementStateReturn ret;
g_return_val_if_fail (GST_IS_BASE_RTP_DEPAYLOAD (element), GST_STATE_FAILURE);
filter = GST_BASE_RTP_DEPAYLOAD (element);
/* we disallow changing the state from the thread */
if (g_thread_self () == filter->thread)
return GST_STATE_FAILURE;
transition = GST_STATE_TRANSITION (element);
switch (transition) {
case GST_STATE_NULL_TO_READY:
if (!gst_base_rtp_depayload_start_thread (filter))
goto start_failed;
break;
case GST_STATE_READY_TO_PAUSED:
break;
case GST_STATE_PAUSED_TO_PLAYING:
break;
default:
break;
}
// ret = GST_ELEMENT_CLASS (parent_class)->change_state (element);
switch (transition) {
case GST_STATE_PLAYING_TO_PAUSED:
break;
case GST_STATE_PAUSED_TO_READY:
break;
case GST_STATE_READY_TO_NULL:
gst_base_rtp_depayload_stop_thread (filter);
break;
}
return GST_STATE_SUCCESS;
/* ERRORS */
start_failed:
{
return GST_STATE_FAILURE;
}
}
static void
gst_base_rtp_depayload_set_property (GObject * object, guint prop_id,
const GValue * value, GParamSpec * pspec)

View file

@ -41,6 +41,11 @@ G_BEGIN_DECLS
// in milliseconds
#define RTP_QUEUEDELAY 100;
#define QUEUE_LOCK_INIT(base) (g_static_rec_mutex_init(&base->queuelock))
#define QUEUE_LOCK_FREE(base) (g_static_rec_mutex_free(&base->queuelock))
#define QUEUE_LOCK(base) (g_static_rec_mutex_lock(&base->queuelock))
#define QUEUE_UNLOCK(base) (g_static_rec_mutex_unlock(&base->queuelock))
typedef struct _GstBaseRTPDepayload GstBaseRTPDepayload;
typedef struct _GstBaseRTPDepayloadClass GstBaseRTPDepayloadClass;
@ -50,6 +55,13 @@ struct _GstBaseRTPDepayload
GstPad *sinkpad, *srcpad;
/* lock to protect the queue */
GStaticRecMutex queuelock;
gboolean thread_running;
/* the releaser thread */
GThread *thread;
// this attribute must be set by the child
guint clock_rate;