rtpbin: store more in the PacketInfo

Store all info in the PacketInfo so that we can avoid mapping the packet
multiple times.
This commit is contained in:
Wim Taymans 2013-09-13 12:22:36 +02:00
parent e5c789abd6
commit a02c9473d8
4 changed files with 75 additions and 72 deletions

View file

@ -1559,11 +1559,26 @@ update_packet (GstBuffer ** buffer, guint idx, RTPPacketInfo * pinfo)
pinfo->bytes += gst_buffer_get_size (*buffer) + pinfo->header_len;
if (pinfo->rtp) {
GstRTPBuffer rtpb = { NULL };
GstRTPBuffer rtp = { NULL };
gst_rtp_buffer_map (*buffer, GST_MAP_READ, &rtpb);
pinfo->payload_len += gst_rtp_buffer_get_payload_len (&rtpb);
gst_rtp_buffer_unmap (&rtpb);
if (!gst_rtp_buffer_map (*buffer, GST_MAP_READ, &rtp))
goto invalid_packet;
pinfo->payload_len += gst_rtp_buffer_get_payload_len (&rtp);
if (idx == 0) {
gint i;
/* only keep info for first buffer */
pinfo->ssrc = gst_rtp_buffer_get_ssrc (&rtp);
pinfo->seqnum = gst_rtp_buffer_get_seq (&rtp);
pinfo->pt = gst_rtp_buffer_get_payload_type (&rtp);
pinfo->rtptime = gst_rtp_buffer_get_timestamp (&rtp);
/* copy available csrc */
pinfo->csrc_count = gst_rtp_buffer_get_csrc_count (&rtp);
for (i = 0; i < pinfo->csrc_count; i++)
pinfo->csrcs[i] = gst_rtp_buffer_get_csrc (&rtp, i);
}
gst_rtp_buffer_unmap (&rtp);
}
if (idx == 0) {
@ -1578,6 +1593,13 @@ update_packet (GstBuffer ** buffer, guint idx, RTPPacketInfo * pinfo)
}
}
return TRUE;
/* ERRORS */
invalid_packet:
{
GST_DEBUG ("invalid RTP packet received");
return FALSE;
}
}
/* update the RTPPacketInfo structure with the current time and other bits
@ -1585,11 +1607,13 @@ update_packet (GstBuffer ** buffer, guint idx, RTPPacketInfo * pinfo)
* This function is typically called when a validated packet is received.
* This function should be called with the SESSION_LOCK
*/
static void
static gboolean
update_packet_info (RTPSession * sess, RTPPacketInfo * pinfo,
gboolean send, gboolean rtp, gboolean is_list, gpointer data,
GstClockTime current_time, GstClockTime running_time, guint64 ntpnstime)
{
gboolean res;
pinfo->send = send;
pinfo->rtp = rtp;
pinfo->is_list = is_list;
@ -1603,11 +1627,14 @@ update_packet_info (RTPSession * sess, RTPPacketInfo * pinfo,
if (is_list) {
GstBufferList *list = GST_BUFFER_LIST_CAST (data);
gst_buffer_list_foreach (list, (GstBufferListFunc) update_packet, pinfo);
res =
gst_buffer_list_foreach (list, (GstBufferListFunc) update_packet,
pinfo);
} else {
GstBuffer *buffer = GST_BUFFER_CAST (data);
update_packet (&buffer, 0, pinfo);
res = update_packet (&buffer, 0, pinfo);
}
return res;
}
static void
@ -1615,6 +1642,10 @@ clean_packet_info (RTPPacketInfo * pinfo)
{
if (pinfo->address)
g_object_unref (pinfo->address);
if (pinfo->data) {
gst_mini_object_unref (pinfo->data);
pinfo->data = NULL;
}
}
static gboolean
@ -1687,35 +1718,19 @@ rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer,
gboolean created;
gboolean prevsender, prevactive;
RTPPacketInfo pinfo = { 0, };
guint32 csrcs[16];
guint8 i, count;
guint64 oldrate;
GstRTPBuffer rtp = { NULL };
g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
if (!gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp))
goto invalid_packet;
/* get SSRC to look up in session database */
ssrc = gst_rtp_buffer_get_ssrc (&rtp);
/* copy available csrc for later */
count = gst_rtp_buffer_get_csrc_count (&rtp);
/* make sure to not overflow our array. An RTP buffer can maximally contain
* 16 CSRCs */
count = MIN (count, 16);
for (i = 0; i < count; i++)
csrcs[i] = gst_rtp_buffer_get_csrc (&rtp, i);
gst_rtp_buffer_unmap (&rtp);
RTP_SESSION_LOCK (sess);
/* update pinfo stats */
update_packet_info (sess, &pinfo, FALSE, TRUE, FALSE, buffer, current_time,
running_time, -1);
if (!update_packet_info (sess, &pinfo, FALSE, TRUE, FALSE, buffer,
current_time, running_time, -1))
goto invalid_packet;
ssrc = pinfo.ssrc;
source = obtain_source (sess, ssrc, &created, &pinfo, TRUE);
if (!source)
@ -1726,7 +1741,7 @@ rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer,
oldrate = source->bitrate;
/* let source process the packet */
result = rtp_source_process_rtp (source, buffer, &pinfo);
result = rtp_source_process_rtp (source, &pinfo);
/* source became active */
if (source_update_active (sess, source, prevactive))
@ -1742,13 +1757,14 @@ rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer,
if (source->validated) {
gboolean created;
gint i;
/* for validated sources, we add the CSRCs as well */
for (i = 0; i < count; i++) {
for (i = 0; i < pinfo.csrc_count; i++) {
guint32 csrc;
RTPSource *csrc_src;
csrc = csrcs[i];
csrc = pinfo.csrcs[i];
/* get source */
csrc_src = obtain_source (sess, csrc, &created, &pinfo, TRUE);
@ -1776,6 +1792,7 @@ rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer,
invalid_packet:
{
gst_buffer_unref (buffer);
RTP_SESSION_UNLOCK (sess);
GST_DEBUG ("invalid RTP packet received");
return GST_FLOW_OK;
}
@ -2391,6 +2408,7 @@ rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer,
sess->stats.avg_rtcp_packet_size, pinfo.bytes);
RTP_SESSION_UNLOCK (sess);
pinfo.data = NULL;
clean_packet_info (&pinfo);
/* notify caller of sr packets in the callback */

View file

@ -863,33 +863,27 @@ get_clock_rate (RTPSource * src, guint8 payload)
* 50 milliseconds apart and arrive 60 milliseconds apart, then the jitter is 10
* milliseconds. */
static void
calculate_jitter (RTPSource * src, GstBuffer * buffer, RTPPacketInfo * pinfo)
calculate_jitter (RTPSource * src, RTPPacketInfo * pinfo)
{
GstClockTime running_time;
guint32 rtparrival, transit, rtptime;
gint32 diff;
gint clock_rate;
guint8 pt;
GstRTPBuffer rtp = { NULL };
/* get arrival time */
if ((running_time = pinfo->running_time) == GST_CLOCK_TIME_NONE)
goto no_time;
if (!gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp))
goto invalid_packet;
pt = gst_rtp_buffer_get_payload_type (&rtp);
pt = pinfo->pt;
GST_LOG ("SSRC %08x got payload %d", src->ssrc, pt);
/* get clockrate */
if ((clock_rate = get_clock_rate (src, pt)) == -1) {
gst_rtp_buffer_unmap (&rtp);
if ((clock_rate = get_clock_rate (src, pt)) == -1)
goto no_clock_rate;
}
rtptime = gst_rtp_buffer_get_timestamp (&rtp);
rtptime = pinfo->rtptime;
/* convert arrival time to RTP timestamp units, truncate to 32 bits, we don't
* care about the absolute value, just the difference. */
@ -918,7 +912,6 @@ calculate_jitter (RTPSource * src, GstBuffer * buffer, RTPPacketInfo * pinfo)
GST_LOG ("rtparrival %u, rtptime %u, clock-rate %d, diff %d, jitter: %f",
rtparrival, rtptime, clock_rate, diff, (src->stats.jitter) / 16.0);
gst_rtp_buffer_unmap (&rtp);
return;
/* ERRORS */
@ -927,11 +920,6 @@ no_time:
GST_WARNING ("cannot get current running_time");
return;
}
invalid_packet:
{
GST_WARNING ("invalid RTP packet");
return;
}
no_clock_rate:
{
GST_WARNING ("cannot get clock-rate for pt %d", pt);
@ -992,35 +980,29 @@ do_bitrate_estimation (RTPSource * src, GstClockTime running_time,
/**
* rtp_source_process_rtp:
* @src: an #RTPSource
* @buffer: an RTP buffer
* @pinfo: an #RTPPacketInfo
*
* Let @src handle the incomming RTP @buffer.
* Let @src handle the incomming RTP packet described in @pinfo.
*
* Returns: a #GstFlowReturn.
*/
GstFlowReturn
rtp_source_process_rtp (RTPSource * src, GstBuffer * buffer,
RTPPacketInfo * pinfo)
rtp_source_process_rtp (RTPSource * src, RTPPacketInfo * pinfo)
{
GstFlowReturn result = GST_FLOW_OK;
guint16 seqnr, udelta;
RTPSourceStats *stats;
guint16 expected;
GstRTPBuffer rtp = { NULL };
g_return_val_if_fail (RTP_IS_SOURCE (src), GST_FLOW_ERROR);
g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
g_return_val_if_fail (pinfo != NULL, GST_FLOW_ERROR);
stats = &src->stats;
if (!gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp))
goto invalid_packet;
seqnr = gst_rtp_buffer_get_seq (&rtp);
gst_rtp_buffer_unmap (&rtp);
seqnr = pinfo->seqnum;
if (stats->cycles == -1) {
GST_DEBUG ("received first buffer");
GST_DEBUG ("received first packet");
/* first time we heard of this source */
init_seq (src, seqnr);
src->stats.max_seq = seqnr - 1;
@ -1045,9 +1027,10 @@ rtp_source_process_rtp (RTPSource * src, GstBuffer * buffer,
} else {
GstBuffer *q;
GST_DEBUG ("probation %d: queue buffer", src->curr_probation);
GST_DEBUG ("probation %d: queue packet", src->curr_probation);
/* when still in probation, keep packets in a list. */
g_queue_push_tail (src->packets, buffer);
g_queue_push_tail (src->packets, pinfo->data);
pinfo->data = NULL;
/* remove packets from queue if there are too many */
while (g_queue_get_length (src->packets) > RTP_MAX_PROBATION_LEN) {
q = g_queue_pop_head (src->packets);
@ -1098,25 +1081,19 @@ rtp_source_process_rtp (RTPSource * src, GstBuffer * buffer,
seqnr, src->stats.packets_received, src->stats.octets_received);
/* calculate jitter for the stats */
calculate_jitter (src, buffer, pinfo);
calculate_jitter (src, pinfo);
/* we're ready to push the RTP packet now */
result = push_packet (src, buffer);
result = push_packet (src, pinfo->data);
pinfo->data = NULL;
done:
return result;
/* ERRORS */
invalid_packet:
{
GST_WARNING ("invalid packet received");
gst_buffer_unref (buffer);
return GST_FLOW_OK;
}
bad_sequence:
{
GST_WARNING ("unacceptable seqnum received");
gst_buffer_unref (buffer);
return GST_FLOW_OK;
}
probation_seqnum:
@ -1124,7 +1101,6 @@ probation_seqnum:
GST_WARNING ("probation: seqnr %d != expected %d", seqnr, expected);
src->curr_probation = src->probation;
src->stats.max_seq = seqnr;
gst_buffer_unref (buffer);
return GST_FLOW_OK;
}
}

View file

@ -229,7 +229,7 @@ void rtp_source_set_rtp_from (RTPSource *src, GSocketAddress *
void rtp_source_set_rtcp_from (RTPSource *src, GSocketAddress *address);
/* handling RTP */
GstFlowReturn rtp_source_process_rtp (RTPSource *src, GstBuffer *buffer, RTPPacketInfo *pinfo);
GstFlowReturn rtp_source_process_rtp (RTPSource *src, RTPPacketInfo *pinfo);
GstFlowReturn rtp_source_send_rtp (RTPSource *src, gpointer data, gboolean is_list,
GstClockTime running_time);

View file

@ -68,6 +68,9 @@ typedef struct {
* @header_len: number of overhead bytes per packet
* @bytes: bytes of the packet including lowlevel overhead
* @payload_len: bytes of the RTP payload
* @seqnum: the seqnum of the packet
* @pt: the payload type of the packet
* @rtptime: the RTP time of the packet
*
* Structure holding information about the packet.
*/
@ -83,6 +86,12 @@ typedef struct {
guint header_len;
guint bytes;
guint payload_len;
guint32 ssrc;
guint16 seqnum;
guint8 pt;
guint32 rtptime;
guint32 csrc_count;
guint32 csrcs[16];
} RTPPacketInfo;
/**