rtpmux: Aggregate incoming segments

This commit is contained in:
Olivier Crête 2010-05-07 16:42:22 -04:00 committed by Tim-Philipp Müller
parent 7be57cac3a
commit 8e58646f5c
3 changed files with 149 additions and 6 deletions

View file

@ -62,6 +62,7 @@ typedef struct
guint clock_base;
GstCaps *out_caps;
GstSegment segment;
} GstRTPMuxPadPrivate;
static GstStaticPadTemplate src_factory = GST_STATIC_PAD_TEMPLATE ("src",
@ -82,6 +83,7 @@ static void gst_rtp_mux_release_pad (GstElement * element, GstPad * pad);
static GstFlowReturn gst_rtp_mux_chain (GstPad * pad, GstBuffer * buffer);
static gboolean gst_rtp_mux_setcaps (GstPad * pad, GstCaps * caps);
static GstCaps *gst_rtp_mux_getcaps (GstPad * pad);
static gboolean gst_rtp_mux_sink_event (GstPad * pad, GstEvent * event);
static GstStateChangeReturn gst_rtp_mux_change_state (GstElement *
element, GstStateChange transition);
@ -219,6 +221,8 @@ gst_rtp_mux_init (GstRTPMux * object, GstRTPMuxClass * g_class)
object->ssrc = DEFAULT_SSRC;
object->ts_offset = DEFAULT_TIMESTAMP_OFFSET;
object->seqnum_offset = DEFAULT_SEQNUM_OFFSET;
object->segment_pending = TRUE;
}
static void
@ -234,15 +238,15 @@ gst_rtp_mux_setup_sinkpad (GstRTPMux * rtp_mux, GstPad * sinkpad)
gst_pad_set_getcaps_function (sinkpad, gst_rtp_mux_getcaps);
if (klass->chain_func)
gst_pad_set_chain_function (sinkpad, klass->chain_func);
if (klass->sink_event_func)
gst_pad_set_event_function (sinkpad, klass->sink_event_func);
gst_pad_set_event_function (sinkpad,
GST_DEBUG_FUNCPTR (gst_rtp_mux_sink_event));
/* This could break with gstreamer 0.10.9 */
gst_pad_set_active (sinkpad, TRUE);
gst_segment_init (&padpriv->segment, GST_FORMAT_UNDEFINED);
gst_pad_set_element_private (sinkpad, padpriv);
/* dd the pad to the element */
gst_pad_set_active (sinkpad, TRUE);
gst_element_add_pad (GST_ELEMENT (rtp_mux), sinkpad);
}
@ -318,6 +322,7 @@ gst_rtp_mux_chain (GstPad * pad, GstBuffer * buffer)
GstRTPMux *rtp_mux;
GstFlowReturn ret;
GstRTPMuxPadPrivate *padpriv;
GstEvent *newseg_event = NULL;
rtp_mux = GST_RTP_MUX (gst_pad_get_parent (pad));
@ -333,15 +338,34 @@ gst_rtp_mux_chain (GstPad * pad, GstBuffer * buffer)
rtp_mux->seqnum++;
gst_rtp_buffer_set_seq (buffer, rtp_mux->seqnum);
padpriv = gst_pad_get_element_private (pad);
if (padpriv)
if (padpriv) {
gst_buffer_set_caps (buffer, padpriv->out_caps);
if (padpriv->segment.format == GST_FORMAT_TIME)
GST_BUFFER_TIMESTAMP (buffer) =
gst_segment_to_running_time (&padpriv->segment, GST_FORMAT_TIME,
GST_BUFFER_TIMESTAMP (buffer));
}
if (rtp_mux->segment_pending) {
/*
* We set the start at 0, because we re-timestamps to the running time
*/
newseg_event = gst_event_new_new_segment_full (FALSE, 1.0, 1.0,
GST_FORMAT_TIME, 0, -1, 0);
rtp_mux->segment_pending = FALSE;
}
GST_OBJECT_UNLOCK (rtp_mux);
gst_rtp_buffer_set_ssrc (buffer, rtp_mux->current_ssrc);
gst_rtp_mux_readjust_rtp_timestamp (rtp_mux, pad, buffer);
GST_LOG_OBJECT (rtp_mux, "Pushing packet size %d, seq=%d, ts=%u",
GST_BUFFER_SIZE (buffer), rtp_mux->seqnum,
gst_rtp_buffer_get_timestamp (buffer));
if (newseg_event)
gst_pad_push_event (rtp_mux->srcpad, newseg_event);
if (!padpriv) {
ret = GST_FLOW_NOT_LINKED;
gst_buffer_unref (buffer);
@ -569,10 +593,103 @@ gst_rtp_mux_set_property (GObject * object,
}
}
static gboolean
gst_rtp_mux_sink_event (GstPad * pad, GstEvent * event)
{
GstRTPMux *mux;
gboolean ret = FALSE;
gboolean forward = TRUE;
mux = GST_RTP_MUX (gst_pad_get_parent (pad));
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_FLUSH_STOP:
{
GstRTPMuxPadPrivate *padpriv;
GST_OBJECT_LOCK (mux);
mux->segment_pending = TRUE;
padpriv = gst_pad_get_element_private (pad);
if (padpriv)
gst_segment_init (&padpriv->segment, GST_FORMAT_UNDEFINED);
GST_OBJECT_UNLOCK (pad);
}
break;
case GST_EVENT_NEWSEGMENT:
{
gboolean update;
gdouble rate, applied_rate;
GstFormat format;
gint64 start, stop, position;
GstRTPMuxPadPrivate *padpriv;
gst_event_parse_new_segment_full (event, &update, &rate, &applied_rate,
&format, &start, &stop, &position);
GST_OBJECT_LOCK (mux);
padpriv = gst_pad_get_element_private (pad);
if (padpriv) {
if (format == GST_FORMAT_TIME)
gst_segment_set_newsegment_full (&padpriv->segment, update,
rate, applied_rate, format, start, stop, position);
else
gst_segment_init (&padpriv->segment, GST_FORMAT_UNDEFINED);
}
GST_OBJECT_UNLOCK (mux);
gst_event_unref (event);
forward = FALSE;
ret = TRUE;
break;
}
default:
break;
}
if (forward) {
GstRTPMuxClass *klass = GST_RTP_MUX_GET_CLASS (mux);
if (klass->sink_event_func)
klass->sink_event_func (pad, event);
else
ret = gst_pad_push_event (mux->srcpad, event);
}
gst_object_unref (mux);
return ret;
}
static void
clear_segment (gpointer data, gpointer user_data)
{
GstPad *pad = data;
GstRTPMux *mux = user_data;
GstRTPMuxPadPrivate *padpriv;
GST_OBJECT_LOCK (mux);
padpriv = gst_pad_get_element_private (pad);
if (padpriv)
gst_segment_init (&padpriv->segment, GST_FORMAT_UNDEFINED);
GST_OBJECT_UNLOCK (mux);
gst_object_unref (pad);
}
static void
gst_rtp_mux_ready_to_paused (GstRTPMux * rtp_mux)
{
GstIterator *iter;
iter = gst_element_iterate_sink_pads (GST_ELEMENT (rtp_mux));
while (gst_iterator_foreach (iter, clear_segment, rtp_mux) ==
GST_ITERATOR_RESYNC);
gst_iterator_free (iter);
GST_OBJECT_LOCK (rtp_mux);
rtp_mux->segment_pending = TRUE;
if (rtp_mux->ssrc == -1)
rtp_mux->current_ssrc = g_random_int ();
@ -589,6 +706,7 @@ gst_rtp_mux_ready_to_paused (GstRTPMux * rtp_mux)
rtp_mux->ts_base = g_random_int ();
else
rtp_mux->ts_base = rtp_mux->ts_offset;
GST_DEBUG_OBJECT (rtp_mux, "set clock-base to %u", rtp_mux->ts_base);
GST_OBJECT_UNLOCK (rtp_mux);

View file

@ -58,6 +58,8 @@ struct _GstRTPMux
guint16 seqnum; /* protected by object lock */
guint ssrc;
guint current_ssrc;
gboolean segment_pending;
};
struct _GstRTPMuxClass

View file

@ -60,6 +60,14 @@ setcaps_func (GstPad * pad, GstCaps * caps)
return TRUE;
}
static gboolean
event_func (GstPad * pad, GstEvent * event)
{
gst_event_unref (event);
return TRUE;
}
static void
test_basic (const gchar * elem_name, int count, check_cb cb)
{
@ -74,6 +82,7 @@ test_basic (const gchar * elem_name, int count, check_cb cb)
GstCaps *src2caps = NULL;
GstCaps *sinkcaps = NULL;
GstCaps *caps;
GstEvent *newsegment;
int i;
rtpmux = gst_check_setup_element (elem_name);
@ -92,6 +101,7 @@ test_basic (const gchar * elem_name, int count, check_cb cb)
gst_pad_set_getcaps_function (src2, getcaps_func);
gst_pad_set_getcaps_function (sink, getcaps_func);
gst_pad_set_setcaps_function (sink, setcaps_func);
gst_pad_set_event_function (sink, event_func);
g_object_set_data (G_OBJECT (src1), "caps", &src1caps);
g_object_set_data (G_OBJECT (src2), "caps", &src2caps);
g_object_set_data (G_OBJECT (sink), "caps", &sinkcaps);
@ -130,8 +140,17 @@ test_basic (const gchar * elem_name, int count, check_cb cb)
"ssrc", G_TYPE_UINT, 66, NULL);
fail_unless (gst_pad_set_caps (src1, caps));
newsegment = gst_event_new_new_segment (FALSE, 1, GST_FORMAT_TIME,
100000, -1, 0);
fail_unless (gst_pad_push_event (src1, newsegment));
newsegment = gst_event_new_new_segment (FALSE, 1, GST_FORMAT_TIME,
50000, -1, 0);
fail_unless (gst_pad_push_event (src2, newsegment));
for (i = 0; i < count; i++) {
inbuf = gst_rtp_buffer_new_allocate (10, 0, 0);
GST_BUFFER_TIMESTAMP (inbuf) = i * 1000 + 100000;
GST_BUFFER_DURATION (inbuf) = 1000;
gst_buffer_set_caps (inbuf, caps);
gst_rtp_buffer_set_version (inbuf, 2);
gst_rtp_buffer_set_payload_type (inbuf, 98);
@ -140,6 +159,10 @@ test_basic (const gchar * elem_name, int count, check_cb cb)
gst_rtp_buffer_set_seq (inbuf, 2000 + i);
fail_unless (gst_pad_push (src1, inbuf) == GST_FLOW_OK);
if (buffers)
fail_unless (GST_BUFFER_TIMESTAMP (buffers->data) == i * 1000, "%lld",
GST_BUFFER_TIMESTAMP (buffers->data));
cb (src2, i);
g_list_foreach (buffers, (GFunc) gst_buffer_unref, NULL);