session: Rearrange RTCP reporting a little

Make a function to generate an RTCP packet for a source, pass the source as a
parameter.
Move timeout of collisions to session cleanup phase.
This commit is contained in:
Wim Taymans 2013-07-25 23:11:05 +02:00
parent a3bf374351
commit 2163355a47

View file

@ -2649,6 +2649,7 @@ typedef struct
{
GstRTCPBuffer rtcpbuf;
RTPSession *sess;
RTPSource *source;
GstBuffer *rtcp;
GstClockTime current_time;
guint64 ntpnstime;
@ -2659,16 +2660,19 @@ typedef struct
gboolean has_sdes;
gboolean is_early;
gboolean may_suppress;
gboolean notify;
} ReportData;
static void
session_start_rtcp (RTPSession * sess, ReportData * data)
{
GstRTCPPacket *packet = &data->packet;
RTPSource *own = sess->source;
RTPSource *own = data->source;
GstRTCPBuffer *rtcp = &data->rtcpbuf;
data->rtcp = gst_rtcp_buffer_new (sess->mtu);
data->is_bye = FALSE;
data->has_sdes = FALSE;
gst_rtcp_buffer_map (data->rtcp, GST_MAP_READWRITE, rtcp);
@ -2703,19 +2707,11 @@ session_start_rtcp (RTPSession * sess, ReportData * data)
static void
session_report_blocks (const gchar * key, RTPSource * source, ReportData * data)
{
RTPSession *sess = data->sess;
GstRTCPPacket *packet = &data->packet;
/* create a new buffer if needed */
if (data->rtcp == NULL) {
session_start_rtcp (sess, data);
} else if (data->is_early) {
/* Put a single RR or SR in minimal compound packets */
return;
}
if (gst_rtcp_packet_get_rb_count (packet) < GST_RTCP_MAX_RB_COUNT) {
/* only report about other sender sources */
if (source != sess->source && RTP_SOURCE_IS_SENDER (source)) {
if (source != data->source && RTP_SOURCE_IS_SENDER (source)) {
guint8 fractionlost;
gint32 packetslost;
guint32 exthighestseq, jitter;
@ -2753,13 +2749,22 @@ session_cleanup (const gchar * key, RTPSource * source, ReportData * data)
GstClockTime interval, binterval;
GstClockTime btime;
is_sender = RTP_SOURCE_IS_SENDER (source);
is_active = RTP_SOURCE_IS_ACTIVE (source);
/* check for outdated collisions */
if (source->internal) {
GST_DEBUG ("Timing out collisions");
rtp_source_timeout (source, data->current_time,
/* "a relatively long time" -- RFC 3550 section 8.2 */
RTP_STATS_MIN_INTERVAL * GST_SECOND * 10,
data->running_time - sess->rtcp_feedback_retention_window);
}
/* nothing to do when without RTCP */
/* nothing else to do when without RTCP */
if (data->interval == GST_CLOCK_TIME_NONE)
return;
is_sender = RTP_SOURCE_IS_SENDER (source);
is_active = RTP_SOURCE_IS_ACTIVE (source);
/* our own rtcp interval may have been forced low by secondary configuration,
* while sender side may still operate with higher interval,
* so do not just take our interval to decide on timing out sender,
@ -2856,9 +2861,9 @@ session_sdes (RTPSession * sess, ReportData * data)
/* add SDES packet */
gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_SDES, packet);
gst_rtcp_packet_sdes_add_item (packet, sess->source->ssrc);
gst_rtcp_packet_sdes_add_item (packet, data->source->ssrc);
sdes = rtp_source_get_sdes_struct (sess->source);
sdes = rtp_source_get_sdes_struct (data->source);
/* add all fields in the structure, the order is not important. */
n_fields = gst_structure_n_fields (sdes);
@ -2912,18 +2917,13 @@ session_sdes (RTPSession * sess, ReportData * data)
/* schedule a BYE packet */
static void
session_bye (RTPSession * sess, ReportData * data)
make_source_bye (RTPSession * sess, RTPSource * source, ReportData * data)
{
GstRTCPPacket *packet = &data->packet;
GstRTCPBuffer *rtcp = &data->rtcpbuf;
RTPSource *source = sess->source;
/* open packet */
session_start_rtcp (sess, data);
/* add SDES */
session_sdes (sess, data);
/* add a BYE packet */
gst_rtcp_buffer_add_packet (rtcp, GST_RTCP_TYPE_BYE, packet);
gst_rtcp_packet_bye_add_ssrc (packet, source->ssrc);
@ -2932,6 +2932,7 @@ session_bye (RTPSession * sess, ReportData * data)
/* we have a BYE packet now */
data->is_bye = TRUE;
source->sent_bye = TRUE;
}
static gboolean
@ -3027,6 +3028,51 @@ remove_closing_sources (const gchar * key, RTPSource * source, gpointer * data)
return source->closing;
}
static void
generate_rtcp (RTPSource * source, ReportData * data)
{
RTPSession *sess = data->sess;
/* only generate RTCP for active internal sources */
if (!source->internal || source->sent_bye)
return;
data->source = source;
/* open packet */
session_start_rtcp (sess, data);
if (source->marked_bye) {
/* send BYE */
make_source_bye (sess, source, data);
} else if (!data->is_early) {
/* loop over all known sources and add report blocks. If we are ealy, we
* just make a minimal RTCP packet and skip this step */
g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
(GHFunc) session_report_blocks, data);
}
if (!data->has_sdes)
session_sdes (sess, data);
gst_rtcp_buffer_unmap (&data->rtcpbuf);
if (sess->change_ssrc) {
GST_DEBUG ("need to change our SSRC (%08x)", source->ssrc);
g_hash_table_steal (sess->ssrcs[sess->mask_idx],
GINT_TO_POINTER (source->ssrc));
source->ssrc = rtp_session_create_new_ssrc (sess);
rtp_source_reset (source);
g_hash_table_insert (sess->ssrcs[sess->mask_idx],
GINT_TO_POINTER (source->ssrc), source);
sess->change_ssrc = FALSE;
data->notify = TRUE;
GST_DEBUG ("changed our SSRC to %08x", source->ssrc);
}
}
/**
* rtp_session_on_timeout:
* @sess: an #RTPSession
@ -3053,7 +3099,6 @@ rtp_session_on_timeout (RTPSession * sess, GstClockTime current_time,
ReportData data = { GST_RTCP_BUFFER_INIT };
RTPSource *own;
GHashTable *table_copy;
gboolean notify = FALSE;
g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
@ -3062,13 +3107,11 @@ rtp_session_on_timeout (RTPSession * sess, GstClockTime current_time,
GST_TIME_ARGS (ntpnstime), GST_TIME_ARGS (running_time));
data.sess = sess;
data.rtcp = NULL;
data.current_time = current_time;
data.ntpnstime = ntpnstime;
data.is_bye = FALSE;
data.has_sdes = FALSE;
data.may_suppress = FALSE;
data.running_time = running_time;
data.may_suppress = FALSE;
data.notify = FALSE;
own = sess->source;
@ -3093,88 +3136,49 @@ rtp_session_on_timeout (RTPSession * sess, GstClockTime current_time,
(GHRFunc) remove_closing_sources, NULL);
/* see if we need to generate SR or RR packets */
if (is_rtcp_time (sess, current_time, &data)) {
if (own->marked_bye) {
/* generate BYE instead */
GST_DEBUG ("generating BYE message");
session_bye (sess, &data);
own->sent_bye = TRUE;
} else {
/* loop over all known sources and do something */
g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
(GHFunc) session_report_blocks, &data);
}
}
if (!is_rtcp_time (sess, current_time, &data))
goto done;
if (data.rtcp) {
/* we keep track of the last report time in order to timeout inactive
* receivers or senders */
if (!data.is_early && !data.may_suppress)
sess->last_rtcp_send_time = data.current_time;
sess->first_rtcp = FALSE;
sess->next_early_rtcp_time = GST_CLOCK_TIME_NONE;
generate_rtcp (own, &data);
/* add SDES for this source when not already added */
if (!data.has_sdes)
session_sdes (sess, &data);
}
/* check for outdated collisions */
GST_DEBUG ("Timing out collisions");
rtp_source_timeout (sess->source, current_time,
/* "a relatively long time" -- RFC 3550 section 8.2 */
RTP_STATS_MIN_INTERVAL * GST_SECOND * 10,
running_time - sess->rtcp_feedback_retention_window);
if (sess->change_ssrc) {
GST_DEBUG ("need to change our SSRC (%08x)", own->ssrc);
g_hash_table_steal (sess->ssrcs[sess->mask_idx],
GINT_TO_POINTER (own->ssrc));
own->ssrc = rtp_session_create_new_ssrc (sess);
rtp_source_reset (own);
g_hash_table_insert (sess->ssrcs[sess->mask_idx],
GINT_TO_POINTER (own->ssrc), own);
sess->change_ssrc = FALSE;
notify = TRUE;
GST_DEBUG ("changed our SSRC to %08x", own->ssrc);
}
sess->allow_early = TRUE;
/* we keep track of the last report time in order to timeout inactive
* receivers or senders */
if (!data.is_early && !data.may_suppress)
sess->last_rtcp_send_time = data.current_time;
sess->first_rtcp = FALSE;
sess->next_early_rtcp_time = GST_CLOCK_TIME_NONE;
done:
RTP_SESSION_UNLOCK (sess);
if (notify)
if (data.notify)
g_object_notify (G_OBJECT (sess), "internal-ssrc");
/* push out the RTCP packet */
if (data.rtcp) {
gboolean do_not_suppress;
gst_rtcp_buffer_unmap (&data.rtcpbuf);
GstBuffer *buffer = data.rtcp;
/* Give the user a change to add its own packet */
g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SENDING_RTCP], 0,
data.rtcp, data.is_early, &do_not_suppress);
buffer, data.is_early, &do_not_suppress);
if (sess->callbacks.send_rtcp && (do_not_suppress || !data.may_suppress)) {
guint packet_size;
packet_size = gst_buffer_get_size (data.rtcp) + sess->header_len;
packet_size = gst_buffer_get_size (buffer) + sess->header_len;
UPDATE_AVG (sess->stats.avg_rtcp_packet_size, packet_size);
GST_DEBUG ("%p, sending RTCP packet, avg size %u, %u", &sess->stats,
sess->stats.avg_rtcp_packet_size, packet_size);
result =
sess->callbacks.send_rtcp (sess, own, data.rtcp, own->sent_bye,
sess->callbacks.send_rtcp (sess, own, buffer, data.is_bye,
sess->send_rtcp_user_data);
} else {
GST_DEBUG ("freeing packet callback: %p"
" do_not_suppress: %d may_suppress: %d",
sess->callbacks.send_rtcp, do_not_suppress, data.may_suppress);
gst_buffer_unref (data.rtcp);
gst_buffer_unref (buffer);
}
}