diff --git a/ChangeLog b/ChangeLog index 5cae6e7e28..d285d3d8df 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,43 @@ +2007-04-25 Wim Taymans + + * gst/rtpmanager/gstrtpbin.c: (create_rtcp): + fix for pad name change + + * gst/rtpmanager/gstrtpsession.c: (rtcp_thread), + (gst_rtp_session_send_rtcp), (gst_rtp_session_clock_rate): + Fix for renamed methods. + + * gst/rtpmanager/rtpsession.c: (rtp_session_init), + (rtp_session_finalize), (rtp_session_set_cname), + (rtp_session_get_cname), (rtp_session_set_name), + (rtp_session_get_name), (rtp_session_set_email), + (rtp_session_get_email), (rtp_session_set_phone), + (rtp_session_get_phone), (rtp_session_set_location), + (rtp_session_get_location), (rtp_session_set_tool), + (rtp_session_get_tool), (rtp_session_set_note), + (rtp_session_get_note), (source_push_rtp), (obtain_source), + (rtp_session_add_source), (rtp_session_get_source_by_ssrc), + (rtp_session_create_source), (rtp_session_process_rtp), + (rtp_session_process_sr), (rtp_session_process_sdes), + (rtp_session_process_rtcp), (rtp_session_send_rtp), + (rtp_session_get_reporting_interval), (session_report_blocks), + (session_sdes), (rtp_session_perform_reporting): + * gst/rtpmanager/rtpsession.h: + Prepare for implementing SSRC sampling. + Create SSRC for the session. + Add methods to set the SDES entries. + fix accounting of senders/receivers. + Implement SR/RR/SDES RTCP reporting. + + * gst/rtpmanager/rtpsource.c: (rtp_source_init), (init_seq), + (rtp_source_process_rtp), (rtp_source_process_sr): + * gst/rtpmanager/rtpsource.h: + Implement extended sequence number. + + * gst/rtpmanager/rtpstats.c: (rtp_stats_calculate_rtcp_interval): + * gst/rtpmanager/rtpstats.h: + Rename some fields. + 2007-04-24 Tim-Philipp Müller * gst/y4m/gsty4mencode.c: (gst_y4m_encode_init), diff --git a/gst/rtpmanager/gstrtpbin.c b/gst/rtpmanager/gstrtpbin.c index 9162d76c4b..215dbc7358 100644 --- a/gst/rtpmanager/gstrtpbin.c +++ b/gst/rtpmanager/gstrtpbin.c @@ -959,7 +959,7 @@ create_rtcp (GstRTPBin * rtpbin, GstPadTemplate * templ, const gchar * name) /* get rtcp_src pad and store */ session->rtcp_src = - gst_element_get_request_pad (session->session, "rtcp_src"); + gst_element_get_request_pad (session->session, "send_rtcp_src"); if (session->rtcp_src == NULL) goto pad_failed; diff --git a/gst/rtpmanager/gstrtpsession.c b/gst/rtpmanager/gstrtpsession.c index 03b0802b65..b11dbbba2e 100644 --- a/gst/rtpmanager/gstrtpsession.c +++ b/gst/rtpmanager/gstrtpsession.c @@ -305,7 +305,7 @@ rtcp_thread (GstRTPSession * rtpsession) gdouble timeout; GstClockTime target; - timeout = rtp_session_get_rtcp_interval (rtpsession->priv->session); + timeout = rtp_session_get_reporting_interval (rtpsession->priv->session); GST_DEBUG_OBJECT (rtpsession, "next RTCP timeout: %lf", timeout); target = gst_clock_get_time (clock); @@ -318,7 +318,7 @@ rtcp_thread (GstRTPSession * rtpsession) GST_DEBUG_OBJECT (rtpsession, "got RTCP timeout"); /* make the session manager produce RTCP, we ignore the result. */ - rtp_session_produce_rtcp (rtpsession->priv->session); + rtp_session_perform_reporting (rtpsession->priv->session); GST_RTP_SESSION_LOCK (rtpsession); gst_clock_id_unref (id); @@ -472,6 +472,8 @@ gst_rtp_session_send_rtcp (RTPSession * sess, RTPSource * src, rtpsession = GST_RTP_SESSION (user_data); priv = rtpsession->priv; + GST_DEBUG_OBJECT (rtpsession, "sending RTCP"); + if (rtpsession->send_rtcp_src) { result = gst_pad_push (rtpsession->send_rtcp_src, buffer); } else { @@ -515,6 +517,8 @@ gst_rtp_session_clock_rate (RTPSession * sess, guint8 payload, if (!gst_structure_get_int (caps_struct, "clock-rate", &result)) goto no_clock_rate; + GST_DEBUG_OBJECT (rtpsession, "parsed clock-rate %d", result); + return result; /* ERRORS */ diff --git a/gst/rtpmanager/rtpsession.c b/gst/rtpmanager/rtpsession.c index e13f7d6c50..e4925a2af0 100644 --- a/gst/rtpmanager/rtpsession.c +++ b/gst/rtpmanager/rtpsession.c @@ -57,6 +57,9 @@ static guint rtp_session_signals[LAST_SIGNAL] = { 0 }; G_DEFINE_TYPE (RTPSession, rtp_session, G_TYPE_OBJECT); +static RTPSource *obtain_source (RTPSession * sess, guint32 ssrc, + gboolean * created, RTPArrivalStats * arrival, gboolean rtp); + static void rtp_session_class_init (RTPSessionClass * klass) { @@ -123,18 +126,35 @@ rtp_session_class_init (RTPSessionClass * klass) static void rtp_session_init (RTPSession * sess) { - sess->lock = g_mutex_new (); - sess->ssrcs = - g_hash_table_new_full (NULL, NULL, NULL, (GDestroyNotify) g_object_unref); - sess->cnames = g_hash_table_new_full (NULL, NULL, g_free, NULL); + gint i; - /* create an SSRC for this session manager */ - sess->source = rtp_session_create_source (sess); + sess->lock = g_mutex_new (); + sess->key = g_random_int (); + sess->mask_idx = 0; + sess->mask = 0; + + for (i = 0; i < 32; i++) { + sess->ssrcs[i] = + g_hash_table_new_full (NULL, NULL, NULL, + (GDestroyNotify) g_object_unref); + } + sess->cnames = g_hash_table_new_full (NULL, NULL, g_free, NULL); rtp_stats_init_defaults (&sess->stats); + /* create an active SSRC for this session manager */ + sess->source = rtp_session_create_source (sess); + sess->stats.active_sources++; + /* default UDP header length */ sess->header_len = 28; + sess->mtu = 1400; + + /* some default SDES entries */ + //sess->cname = g_strdup_printf ("%s@%s", g_get_user_name (), g_get_host_name ()); + sess->cname = g_strdup_printf ("foo@%s", g_get_host_name ()); + sess->name = g_strdup (g_get_real_name ()); + sess->tool = g_strdup ("GStreamer"); GST_DEBUG ("%p: session using SSRC: %08x", sess, sess->source->ssrc); } @@ -143,14 +163,20 @@ static void rtp_session_finalize (GObject * object) { RTPSession *sess; + gint i; sess = RTP_SESSION_CAST (object); g_mutex_free (sess->lock); - g_hash_table_destroy (sess->ssrcs); + for (i = 0; i < 32; i++) + g_hash_table_destroy (sess->ssrcs[i]); + g_hash_table_destroy (sess->cnames); g_object_unref (sess->source); + g_free (sess->cname); + g_free (sess->tool); + G_OBJECT_CLASS (rtp_session_parent_class)->finalize (object); } @@ -312,6 +338,230 @@ rtp_session_get_rtcp_bandwidth (RTPSession * sess) return sess->stats.rtcp_bandwidth; } +/** + * rtp_session_set_cname: + * @sess: an #RTPSession + * @cname: a CNAME for the session + * + * Set the CNAME for the session. + */ +void +rtp_session_set_cname (RTPSession * sess, const gchar * cname) +{ + g_return_if_fail (RTP_IS_SESSION (sess)); + + g_free (sess->cname); + sess->cname = g_strdup (cname); +} + +/** + * rtp_session_get_cname: + * @sess: an #RTPSession + * + * Get the currently configured CNAME for the session. + * + * Returns: The CNAME. g_free after usage. + */ +gchar * +rtp_session_get_cname (RTPSession * sess) +{ + g_return_val_if_fail (RTP_IS_SESSION (sess), NULL); + + return g_strdup (sess->cname); +} + +/** + * rtp_session_set_name: + * @sess: an #RTPSession + * @name: a NAME for the session + * + * Set the NAME for the session. + */ +void +rtp_session_set_name (RTPSession * sess, const gchar * name) +{ + g_return_if_fail (RTP_IS_SESSION (sess)); + + g_free (sess->name); + sess->name = g_strdup (name); +} + +/** + * rtp_session_get_name: + * @sess: an #RTPSession + * + * Get the currently configured NAME for the session. + * + * Returns: The NAME. g_free after usage. + */ +gchar * +rtp_session_get_name (RTPSession * sess) +{ + g_return_val_if_fail (RTP_IS_SESSION (sess), NULL); + + return g_strdup (sess->name); +} + +/** + * rtp_session_set_email: + * @sess: an #RTPSession + * @email: an EMAIL for the session + * + * Set the EMAIL the session. + */ +void +rtp_session_set_email (RTPSession * sess, const gchar * email) +{ + g_return_if_fail (RTP_IS_SESSION (sess)); + + g_free (sess->email); + sess->email = g_strdup (email); +} + +/** + * rtp_session_get_email: + * @sess: an #RTPSession + * + * Get the currently configured EMAIL of the session. + * + * Returns: The EMAIL. g_free after usage. + */ +gchar * +rtp_session_get_email (RTPSession * sess) +{ + g_return_val_if_fail (RTP_IS_SESSION (sess), NULL); + + return g_strdup (sess->email); +} + +/** + * rtp_session_set_phone: + * @sess: an #RTPSession + * @phone: a PHONE for the session + * + * Set the PHONE the session. + */ +void +rtp_session_set_phone (RTPSession * sess, const gchar * phone) +{ + g_return_if_fail (RTP_IS_SESSION (sess)); + + g_free (sess->phone); + sess->phone = g_strdup (phone); +} + +/** + * rtp_session_get_location: + * @sess: an #RTPSession + * + * Get the currently configured PHONE of the session. + * + * Returns: The PHONE. g_free after usage. + */ +gchar * +rtp_session_get_phone (RTPSession * sess) +{ + g_return_val_if_fail (RTP_IS_SESSION (sess), NULL); + + return g_strdup (sess->phone); +} + +/** + * rtp_session_set_location: + * @sess: an #RTPSession + * @location: a LOCATION for the session + * + * Set the LOCATION the session. + */ +void +rtp_session_set_location (RTPSession * sess, const gchar * location) +{ + g_return_if_fail (RTP_IS_SESSION (sess)); + + g_free (sess->location); + sess->location = g_strdup (location); +} + +/** + * rtp_session_get_location: + * @sess: an #RTPSession + * + * Get the currently configured LOCATION of the session. + * + * Returns: The LOCATION. g_free after usage. + */ +gchar * +rtp_session_get_location (RTPSession * sess) +{ + g_return_val_if_fail (RTP_IS_SESSION (sess), NULL); + + return g_strdup (sess->location); +} + +/** + * rtp_session_set_tool: + * @sess: an #RTPSession + * @tool: a TOOL for the session + * + * Set the TOOL the session. + */ +void +rtp_session_set_tool (RTPSession * sess, const gchar * tool) +{ + g_return_if_fail (RTP_IS_SESSION (sess)); + + g_free (sess->tool); + sess->tool = g_strdup (tool); +} + +/** + * rtp_session_get_tool: + * @sess: an #RTPSession + * + * Get the currently configured TOOL of the session. + * + * Returns: The TOOL. g_free after usage. + */ +gchar * +rtp_session_get_tool (RTPSession * sess) +{ + g_return_val_if_fail (RTP_IS_SESSION (sess), NULL); + + return g_strdup (sess->tool); +} + +/** + * rtp_session_set_note: + * @sess: an #RTPSession + * @note: a NOTE for the session + * + * Set the NOTE the session. + */ +void +rtp_session_set_note (RTPSession * sess, const gchar * note) +{ + g_return_if_fail (RTP_IS_SESSION (sess)); + + g_free (sess->note); + sess->note = g_strdup (note); +} + +/** + * rtp_session_get_note: + * @sess: an #RTPSession + * + * Get the currently configured NOTE of the session. + * + * Returns: The NOTE. g_free after usage. + */ +gchar * +rtp_session_get_note (RTPSession * sess) +{ + g_return_val_if_fail (RTP_IS_SESSION (sess), NULL); + + return g_strdup (sess->note); +} + static GstFlowReturn source_push_rtp (RTPSource * source, GstBuffer * buffer, RTPSession * session) { @@ -319,6 +569,8 @@ source_push_rtp (RTPSource * source, GstBuffer * buffer, RTPSession * session) if (source == session->source) { GST_DEBUG ("source %08x pushed sender RTP packet", source->ssrc); + + if (session->callbacks.send_rtp) result = session->callbacks.send_rtp (session, source, buffer, @@ -371,7 +623,8 @@ obtain_source (RTPSession * sess, guint32 ssrc, gboolean * created, { RTPSource *source; - source = g_hash_table_lookup (sess->ssrcs, GINT_TO_POINTER (ssrc)); + source = + g_hash_table_lookup (sess->ssrcs[sess->mask_idx], GINT_TO_POINTER (ssrc)); if (source == NULL) { /* make new Source in probation and insert */ source = rtp_source_new (ssrc); @@ -392,7 +645,8 @@ obtain_source (RTPSession * sess, guint32 ssrc, gboolean * created, /* configure a callback on the source */ rtp_source_set_callbacks (source, &callbacks, sess); - g_hash_table_insert (sess->ssrcs, GINT_TO_POINTER (ssrc), source); + g_hash_table_insert (sess->ssrcs[sess->mask_idx], GINT_TO_POINTER (ssrc), + source); /* we have one more source now */ sess->total_sources++; @@ -426,9 +680,12 @@ rtp_session_add_source (RTPSession * sess, RTPSource * src) g_return_val_if_fail (src != NULL, FALSE); RTP_SESSION_LOCK (sess); - find = g_hash_table_lookup (sess->ssrcs, GINT_TO_POINTER (src->ssrc)); + find = + g_hash_table_lookup (sess->ssrcs[sess->mask_idx], + GINT_TO_POINTER (src->ssrc)); if (find == NULL) { - g_hash_table_insert (sess->ssrcs, GINT_TO_POINTER (src->ssrc), src); + g_hash_table_insert (sess->ssrcs[sess->mask_idx], + GINT_TO_POINTER (src->ssrc), src); /* we have one more source now */ sess->total_sources++; result = TRUE; @@ -501,7 +758,8 @@ rtp_session_get_source_by_ssrc (RTPSession * sess, guint32 ssrc) g_return_val_if_fail (RTP_IS_SESSION (sess), NULL); RTP_SESSION_LOCK (sess); - result = g_hash_table_lookup (sess->ssrcs, GINT_TO_POINTER (ssrc)); + result = + g_hash_table_lookup (sess->ssrcs[sess->mask_idx], GINT_TO_POINTER (ssrc)); if (result) g_object_ref (result); RTP_SESSION_UNLOCK (sess); @@ -556,11 +814,13 @@ rtp_session_create_source (RTPSession * sess) ssrc = g_random_int (); /* see if it exists in the session, we're done if it doesn't */ - if (g_hash_table_lookup (sess->ssrcs, GINT_TO_POINTER (ssrc)) == NULL) + if (g_hash_table_lookup (sess->ssrcs[sess->mask_idx], + GINT_TO_POINTER (ssrc)) == NULL) break; } source = rtp_source_new (ssrc); - g_hash_table_insert (sess->ssrcs, GINT_TO_POINTER (ssrc), source); + g_hash_table_insert (sess->ssrcs[sess->mask_idx], GINT_TO_POINTER (ssrc), + source); /* we have one more source now */ sess->total_sources++; RTP_SESSION_UNLOCK (sess); @@ -633,6 +893,8 @@ rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer) prevsender = RTP_SOURCE_IS_SENDER (source); prevactive = RTP_SOURCE_IS_ACTIVE (source); + gst_buffer_ref (buffer); + /* let source process the packet */ result = rtp_source_process_rtp (source, buffer, &arrival); @@ -652,10 +914,11 @@ rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer) if (created) on_new_ssrc (sess, source); - /* for validated sources, we add the CSRCs as well */ if (source->validated) { guint8 i, count; + gboolean created; + /* for validated sources, we add the CSRCs as well */ count = gst_rtp_buffer_get_csrc_count (buffer); for (i = 0; i < count; i++) { @@ -675,6 +938,8 @@ rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer) } } } + gst_buffer_unref (buffer); + RTP_SESSION_UNLOCK (sess); return result; @@ -704,17 +969,27 @@ rtp_session_process_sr (RTPSession * sess, GstRTCPPacket * packet, guint64 ntptime; guint count, i; RTPSource *source; - gboolean created; + gboolean created, prevsender; gst_rtcp_packet_sr_get_sender_info (packet, &senderssrc, &ntptime, &rtptime, &packet_count, &octet_count); + GST_DEBUG ("got SR packet: SSRC %08x", senderssrc); + RTP_SESSION_LOCK (sess); source = obtain_source (sess, senderssrc, &created, arrival, FALSE); + prevsender = RTP_SOURCE_IS_SENDER (source); + /* first update the source */ rtp_source_process_sr (source, ntptime, rtptime, packet_count, octet_count); + if (prevsender != RTP_SOURCE_IS_SENDER (source)) { + sess->stats.sender_sources++; + GST_DEBUG ("source: %08x became sender, %d sender sources", senderssrc, + sess->stats.sender_sources); + } + if (created) on_new_ssrc (sess, source); @@ -785,36 +1060,36 @@ static void rtp_session_process_sdes (RTPSession * sess, GstRTCPPacket * packet, RTPArrivalStats * arrival) { - guint chunks, i, j; - gboolean more_chunks, more_items; + guint items, i, j; + gboolean more_items, more_entries; - chunks = gst_rtcp_packet_sdes_get_chunk_count (packet); - GST_DEBUG ("got SDES packet with %d chunks", chunks); + items = gst_rtcp_packet_sdes_get_item_count (packet); + GST_DEBUG ("got SDES packet with %d items", items); - more_chunks = gst_rtcp_packet_sdes_first_chunk (packet); + more_items = gst_rtcp_packet_sdes_first_item (packet); i = 0; - while (more_chunks) { + while (more_items) { guint32 ssrc; ssrc = gst_rtcp_packet_sdes_get_ssrc (packet); - GST_DEBUG ("chunk %d, SSRC %08x", i, ssrc); + GST_DEBUG ("item %d, SSRC %08x", i, ssrc); - more_items = gst_rtcp_packet_sdes_first_item (packet); + more_entries = gst_rtcp_packet_sdes_first_entry (packet); j = 0; - while (more_items) { + while (more_entries) { GstRTCPSDESType type; guint8 len; - gchar *data; + guint8 *data; - gst_rtcp_packet_sdes_get_item (packet, &type, &len, &data); + gst_rtcp_packet_sdes_get_entry (packet, &type, &len, &data); - GST_DEBUG ("item %d, type %d, len %d, data %s", j, type, len, data); + GST_DEBUG ("entry %d, type %d, len %d, data %s", j, type, len, data); - more_items = gst_rtcp_packet_sdes_next_item (packet); + more_entries = gst_rtcp_packet_sdes_next_entry (packet); j++; } - more_chunks = gst_rtcp_packet_sdes_next_chunk (packet); + more_items = gst_rtcp_packet_sdes_next_item (packet); i++; } } @@ -906,6 +1181,7 @@ rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer) GST_DEBUG ("received RTCP packet"); /* get packet size including header overhead */ + RTP_SESSION_LOCK (sess); size = GST_BUFFER_SIZE (buffer) + sess->header_len; /* update average RTCP packet size */ @@ -914,6 +1190,7 @@ rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer) else sess->stats.avg_rtcp_packet_size = (size + (15 * sess->stats.avg_rtcp_packet_size)) >> 4; + RTP_SESSION_UNLOCK (sess); /* start processing the compound packet */ more = gst_rtcp_buffer_get_first_packet (buffer, &packet); @@ -972,6 +1249,7 @@ rtp_session_send_rtp (RTPSession * sess, GstBuffer * buffer) g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR); g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR); + RTP_SESSION_LOCK (sess); source = sess->source; prevsender = RTP_SOURCE_IS_SENDER (source); @@ -981,20 +1259,22 @@ rtp_session_send_rtp (RTPSession * sess, GstBuffer * buffer) if (RTP_SOURCE_IS_SENDER (source) && !prevsender) sess->stats.sender_sources++; + RTP_SESSION_UNLOCK (sess); return result; } /** - * rtp_session_get_rtcp_interval: + * rtp_session_get_reporting_interval: * @sess: an #RTPSession * - * Get the interval for sending out the next RTCP packet + * Get the interval for sending out the next RTCP packet and doing general + * maintenance tasks. * * Returns: an interval in seconds. */ gdouble -rtp_session_get_rtcp_interval (RTPSession * sess) +rtp_session_get_reporting_interval (RTPSession * sess) { gdouble result; @@ -1008,8 +1288,112 @@ rtp_session_get_rtcp_interval (RTPSession * sess) return result; } +typedef struct +{ + RTPSession *sess; + GstBuffer *rtcp; + GstRTCPPacket packet; +} ReportData; + +static void +session_report_blocks (const gchar * key, RTPSource * source, ReportData * data) +{ + RTPSession *sess = data->sess; + RTPSource *own = sess->source; + GstRTCPPacket *packet = &data->packet; + + /* create a new buffer if needed */ + if (data->rtcp == NULL) { + data->rtcp = gst_rtcp_buffer_new (sess->mtu); + + if (RTP_SOURCE_IS_SENDER (own)) { + /* we are a sender, create SR */ + GST_DEBUG ("create SR for SSRC %08x", own->ssrc); + gst_rtcp_buffer_add_packet (data->rtcp, GST_RTCP_TYPE_SR, packet); + + /* fill in sender report info */ + gst_rtcp_packet_sr_set_sender_info (packet, own->ssrc, + 0, 0, own->stats.packets_sent, own->stats.octets_sent); + } else { + /* we are only receiver, create RR */ + GST_DEBUG ("create RR for SSRC %08x", own->ssrc); + gst_rtcp_buffer_add_packet (data->rtcp, GST_RTCP_TYPE_RR, packet); + gst_rtcp_packet_rr_set_ssrc (packet, own->ssrc); + } + } + if (gst_rtcp_packet_get_rb_count (packet) < GST_RTCP_MAX_RB_COUNT) { + /* only report about other sources */ + if (source != sess->source) { + RTPSourceStats *stats; + guint32 extended_max, expected; + guint32 expected_interval, received_interval; + guint32 lost, lost_interval, fraction; + + stats = &source->stats; + + extended_max = (stats->cycles << 16) + stats->max_seq; + expected = extended_max - stats->base_seq + 1; + + if (expected > stats->packets_received) { + lost = expected - stats->packets_received; + if (lost > 0x7fffff) + lost = 0x7fffff; + } else { + lost = stats->packets_received - expected; + if (lost > 0x800000) + lost = 0x800000; + else + lost = -lost; + } + + expected_interval = expected - stats->prev_expected; + stats->prev_expected = expected; + received_interval = stats->packets_received - stats->prev_received; + stats->prev_received = stats->packets_received; + + lost_interval = expected_interval - received_interval; + + if (expected_interval == 0 || lost_interval <= 0) + fraction = 0; + else + fraction = (lost_interval << 8) / expected_interval; + + GST_DEBUG ("add RR for SSRC %08x", source->ssrc); + /* we scaled the jitter up for additional precision */ + GST_DEBUG ("fraction %d, lost %d, extseq %u, jitter %d", fraction, lost, + extended_max, stats->jitter >> 4); + + /* packet is not yet filled, add report block for this source. */ + gst_rtcp_packet_add_rb (packet, source->ssrc, fraction, lost, + extended_max, stats->jitter >> 4, 0, 0); + } + } +} + +static void +session_sdes (RTPSession * sess, GstBuffer * buffer) +{ + GstRTCPPacket packet; + + /* add SDES packet */ + gst_rtcp_buffer_add_packet (buffer, GST_RTCP_TYPE_SDES, &packet); + + gst_rtcp_packet_sdes_add_item (&packet, sess->source->ssrc); + gst_rtcp_packet_sdes_add_entry (&packet, GST_RTCP_SDES_CNAME, + strlen (sess->cname), (guint8 *) sess->cname); + + /* other SDES items must only be added at regular intervals and only when the + * user requests to since it might be a privacy problem */ +#if 0 + gst_rtcp_packet_sdes_add_entry (&packet, GST_RTCP_SDES_NAME, + strlen (sess->name), (guint8 *) sess->name); + gst_rtcp_packet_sdes_add_entry (&packet, GST_RTCP_SDES_TOOL, + strlen (sess->tool), (guint8 *) sess->tool); +#endif +} + /** - * rtp_session_produce_rtcp: + * rtp_session_perform_reporting: * @sess: an #RTPSession * * Instruct the session manager to generate RTCP packets with current stats. @@ -1019,8 +1403,39 @@ rtp_session_get_rtcp_interval (RTPSession * sess) * Returns: a #GstFlowReturn. */ GstFlowReturn -rtp_session_produce_rtcp (RTPSession * sess) +rtp_session_perform_reporting (RTPSession * sess) { - /* FIXME: implement me */ - return GST_FLOW_NOT_SUPPORTED; + GstFlowReturn result = GST_FLOW_OK; + ReportData data; + + g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR); + + data.sess = sess; + data.rtcp = NULL; + + RTP_SESSION_LOCK (sess); + /* loop over all known sources and do something */ + g_hash_table_foreach (sess->ssrcs[sess->mask_idx], + (GHFunc) session_report_blocks, &data); + + /* add SDES for this source */ + if (data.rtcp) { + session_sdes (sess, data.rtcp); + sess->stats.sent_rtcp = TRUE; + } + + RTP_SESSION_UNLOCK (sess); + + /* push out the RTCP packet */ + if (data.rtcp) { + /* close the RTCP packet */ + gst_rtcp_buffer_end (data.rtcp); + + if (sess->callbacks.send_rtcp) + result = sess->callbacks.send_rtcp (sess, sess->source, data.rtcp, + sess->user_data); + else + gst_buffer_unref (data.rtcp); + } + return result; } diff --git a/gst/rtpmanager/rtpsession.h b/gst/rtpmanager/rtpsession.h index 46062c9972..493387fa88 100644 --- a/gst/rtpmanager/rtpsession.h +++ b/gst/rtpmanager/rtpsession.h @@ -143,9 +143,24 @@ struct _RTPSession { GMutex *lock; guint header_len; + guint mtu; RTPSource *source; - GHashTable *ssrcs; + + /* info for creating reports */ + gchar *cname; + gchar *name; + gchar *email; + gchar *phone; + gchar *location; + gchar *tool; + gchar *note; + + /* for sender/receiver counting */ + guint32 key; + guint32 mask_idx; + guint32 mask; + GHashTable *ssrcs[32]; GHashTable *cnames; guint total_sources; @@ -184,6 +199,21 @@ gdouble rtp_session_get_bandwidth (RTPSession *sess); void rtp_session_set_rtcp_fraction (RTPSession *sess, gdouble fraction); gdouble rtp_session_get_rtcp_fraction (RTPSession *sess); +void rtp_session_set_cname (RTPSession *sess, const gchar *cname); +gchar* rtp_session_get_cname (RTPSession *sess); +void rtp_session_set_name (RTPSession *sess, const gchar *name); +gchar* rtp_session_get_name (RTPSession *sess); +void rtp_session_set_email (RTPSession *sess, const gchar *email); +gchar* rtp_session_get_email (RTPSession *sess); +void rtp_session_set_phone (RTPSession *sess, const gchar *phone); +gchar* rtp_session_get_phone (RTPSession *sess); +void rtp_session_set_location (RTPSession *sess, const gchar *location); +gchar* rtp_session_get_location (RTPSession *sess); +void rtp_session_set_tool (RTPSession *sess, const gchar *tool); +gchar* rtp_session_get_tool (RTPSession *sess); +void rtp_session_set_note (RTPSession *sess, const gchar *note); +gchar* rtp_session_get_note (RTPSession *sess); + /* handling sources */ gboolean rtp_session_add_source (RTPSession *sess, RTPSource *src); gint rtp_session_get_num_sources (RTPSession *sess); @@ -200,7 +230,7 @@ GstFlowReturn rtp_session_process_rtcp (RTPSession *sess, GstBuffer GstFlowReturn rtp_session_send_rtp (RTPSession *sess, GstBuffer *buffer); /* get interval for next RTCP interval */ -gdouble rtp_session_get_rtcp_interval (RTPSession *sess); -GstFlowReturn rtp_session_produce_rtcp (RTPSession *sess); +gdouble rtp_session_get_reporting_interval (RTPSession *sess); +GstFlowReturn rtp_session_perform_reporting (RTPSession *sess); #endif /* __RTP_SESSION_H__ */ diff --git a/gst/rtpmanager/rtpsource.c b/gst/rtpmanager/rtpsource.c index 36f54381c0..43acb0847a 100644 --- a/gst/rtpmanager/rtpsource.c +++ b/gst/rtpmanager/rtpsource.c @@ -70,6 +70,7 @@ rtp_source_init (RTPSource * src) src->clock_rate = -1; src->packets = g_queue_new (); + src->stats.cycles = -1; src->stats.jitter = 0; src->stats.transit = -1; src->stats.curr_sr = 0; @@ -279,6 +280,20 @@ no_clock_rate: } } +static void +init_seq (RTPSource * src, guint16 seq) +{ + src->stats.base_seq = seq; + src->stats.max_seq = seq; + src->stats.bad_seq = RTP_SEQ_MOD + 1; /* so seq == bad_seq is false */ + src->stats.cycles = 0; + src->stats.packets_received = 0; + src->stats.octets_received = 0; + src->stats.bytes_received = 0; + src->stats.prev_received = 0; + src->stats.prev_expected = 0; +} + /** * rtp_source_process_rtp: * @src: an #RTPSource @@ -293,58 +308,108 @@ rtp_source_process_rtp (RTPSource * src, GstBuffer * buffer, RTPArrivalStats * arrival) { GstFlowReturn result = GST_FLOW_OK; + guint16 seqnr, udelta; + RTPSourceStats *stats; g_return_val_if_fail (RTP_IS_SOURCE (src), GST_FLOW_ERROR); g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR); + stats = &src->stats; + + seqnr = gst_rtp_buffer_get_seq (buffer); + + if (stats->cycles == -1) { + GST_DEBUG ("first buffer"); + /* first time we heard of this source */ + init_seq (src, seqnr); + src->stats.max_seq = seqnr - 1; + src->probation = RTP_DEFAULT_PROBATION; + } + + udelta = seqnr - stats->max_seq; + /* if we are still on probation, check seqnum */ if (src->probation) { - guint16 seqnr, expected; + guint16 expected; - expected = src->stats.max_seqnr + 1; + expected = src->stats.max_seq + 1; /* when in probation, we require consecutive seqnums */ - seqnr = gst_rtp_buffer_get_seq (buffer); if (seqnr == expected) { /* expected packet */ - src->probation--; - src->stats.max_seqnr = seqnr; - GST_DEBUG ("probation: seqnr %d == expected %d", seqnr, expected); + src->probation--; + src->stats.max_seq = seqnr; + if (src->probation == 0) { + GST_DEBUG ("probation done!", src->probation); + init_seq (src, seqnr); + } else { + GstBuffer *q; + + GST_DEBUG ("probation %d: queue buffer", src->probation); + /* when still in probation, keep packets in a list. */ + g_queue_push_tail (src->packets, buffer); + /* remove packets from queue if there are too many */ + while (g_queue_get_length (src->packets) > RTP_MAX_PROBATION_LEN) { + q = g_queue_pop_head (src->packets); + gst_object_unref (q); + } + goto done; + } } else { GST_DEBUG ("probation: seqnr %d != expected %d", seqnr, expected); src->probation = RTP_DEFAULT_PROBATION; - src->stats.max_seqnr = seqnr; + src->stats.max_seq = seqnr; + goto done; } - } - if (src->probation) { - GstBuffer *q; - - GST_DEBUG ("probation %d: queue buffer", src->probation); - /* when still in probation, keep packets in a list. */ - g_queue_push_tail (src->packets, buffer); - /* remove packets from queue if there are too many */ - while (g_queue_get_length (src->packets) > RTP_MAX_PROBATION_LEN) { - q = g_queue_pop_head (src->packets); - gst_object_unref (q); + } else if (udelta < RTP_MAX_DROPOUT) { + /* in order, with permissible gap */ + if (seqnr < stats->max_seq) { + /* sequence number wrapped - count another 64K cycle. */ + stats->cycles++; + } + stats->max_seq = seqnr; + } else if (udelta <= RTP_SEQ_MOD - RTP_MAX_MISORDER) { + /* the sequence number made a very large jump */ + if (seqnr == stats->bad_seq) { + /* two sequential packets -- assume that the other side + * restarted without telling us so just re-sync + * (i.e., pretend this was the first packet). */ + init_seq (src, seqnr); + } else { + /* unacceptable jump */ + stats->bad_seq = (seqnr + 1) & (RTP_SEQ_MOD - 1); + goto bad_sequence; } } else { - /* we are not in probation */ - src->stats.octetsreceived += arrival->payload_len; - src->stats.bytesreceived += arrival->bytes; - src->stats.packetsreceived++; - src->is_sender = TRUE; - - GST_DEBUG ("PC: %" G_GUINT64_FORMAT ", OC: %" G_GUINT64_FORMAT, - src->stats.packetsreceived, src->stats.octetsreceived); - - /* calculate jitter */ - calculate_jitter (src, buffer, arrival); - - /* we're ready to push the RTP packet now */ - result = push_packet (src, buffer); + /* duplicate or reordered packet, will be filtered by jitterbuffer. */ } + + src->stats.octets_received += arrival->payload_len; + src->stats.bytes_received += arrival->bytes; + src->stats.packets_received++; + /* the source that sent the packet must be a sender */ + src->is_sender = TRUE; + src->validated = TRUE; + + GST_DEBUG ("PC: %" G_GUINT64_FORMAT ", OC: %" G_GUINT64_FORMAT, + src->stats.packets_received, src->stats.octets_received); + + /* calculate jitter */ + calculate_jitter (src, buffer, arrival); + + /* we're ready to push the RTP packet now */ + result = push_packet (src, buffer); + +done: return result; + + /* ERRORS */ +bad_sequence: + { + GST_WARNING ("unacceptable seqnum received"); + return GST_FLOW_OK; + } } /** @@ -424,6 +489,9 @@ rtp_source_process_sr (RTPSource * src, guint64 ntptime, guint32 rtptime, curridx = src->stats.curr_sr ^ 1; curr = &src->stats.sr[curridx]; + /* this is a sender now */ + src->is_sender = TRUE; + /* update current */ curr->is_valid = TRUE; curr->ntptime = ntptime; diff --git a/gst/rtpmanager/rtpsource.h b/gst/rtpmanager/rtpsource.h index d4ae6f5524..2f997fb5e9 100644 --- a/gst/rtpmanager/rtpsource.h +++ b/gst/rtpmanager/rtpsource.h @@ -31,6 +31,10 @@ #define RTP_NO_PROBATION 0 #define RTP_DEFAULT_PROBATION 2 +#define RTP_SEQ_MOD (1 << 16) +#define RTP_MAX_DROPOUT 3000 +#define RTP_MAX_MISORDER 100 + typedef struct _RTPSource RTPSource; typedef struct _RTPSourceClass RTPSourceClass; @@ -69,7 +73,8 @@ typedef struct _RTPSourceClass RTPSourceClass; * * Returns: a #GstFlowReturn. */ -typedef GstFlowReturn (*RTPSourcePushRTP) (RTPSource *src, GstBuffer *buffer, gpointer user_data); +typedef GstFlowReturn (*RTPSourcePushRTP) (RTPSource *src, GstBuffer *buffer, + gpointer user_data); /** * RTPSourceClockRate: @@ -106,19 +111,23 @@ struct _RTPSource { GObject object; /*< private >*/ - RTPSourceCallbacks callbacks; - gpointer user_data; - guint32 ssrc; - gchar *cname; + gint probation; gboolean validated; - gboolean received_bye; - gchar *bye_reason; - gboolean is_csrc; gboolean is_sender; + gchar *cname; + gchar *name; + gchar *email; + gchar *phone; + gchar *location; + gchar *tool; + gchar *note; + gboolean received_bye; + gchar *bye_reason; + gboolean have_rtp_from; GstNetAddress rtp_from; gboolean have_rtcp_from; @@ -129,6 +138,9 @@ struct _RTPSource { GQueue *packets; + RTPSourceCallbacks callbacks; + gpointer user_data; + RTPSourceStats stats; }; @@ -147,6 +159,7 @@ void rtp_source_set_as_csrc (RTPSource *src); void rtp_source_set_rtp_from (RTPSource *src, GstNetAddress *address); void rtp_source_set_rtcp_from (RTPSource *src, GstNetAddress *address); +/* handling RTP */ GstFlowReturn rtp_source_process_rtp (RTPSource *src, GstBuffer *buffer, RTPArrivalStats *arrival); GstFlowReturn rtp_source_send_rtp (RTPSource *src, GstBuffer *buffer); diff --git a/gst/rtpmanager/rtpstats.c b/gst/rtpmanager/rtpstats.c index b9076eacd5..456ed15f80 100644 --- a/gst/rtpmanager/rtpstats.c +++ b/gst/rtpmanager/rtpstats.c @@ -65,7 +65,7 @@ rtp_stats_calculate_rtcp_interval (RTPSessionStats * stats, gboolean sender) GST_DEBUG ("senders: %f, receivers %f, avg_rtcp %f, sfraction %f", senders, receivers, avg_rtcp, sfraction); - if (sfraction <= stats->sender_fraction) { + if (senders > 0 && sfraction <= stats->sender_fraction) { if (sender) { interval = (avg_rtcp * senders) / (stats->sender_fraction * diff --git a/gst/rtpmanager/rtpstats.h b/gst/rtpmanager/rtpstats.h index 66aa7bf72e..6432142782 100644 --- a/gst/rtpmanager/rtpstats.h +++ b/gst/rtpmanager/rtpstats.h @@ -92,14 +92,23 @@ typedef struct { * Stats about a source. */ typedef struct { - guint64 packetsreceived; - guint64 prevpacketsreceived; - guint64 octetsreceived; - guint64 bytesreceived; - guint16 max_seqnr; + guint64 packets_received; + guint64 octets_received; + guint64 bytes_received; + + guint32 prev_expected; + guint32 prev_received; + + guint16 max_seq; + guint32 cycles; + guint32 base_seq; + guint32 bad_seq; guint32 transit; guint32 jitter; + guint64 packets_sent; + guint64 octets_sent; + /* when we received stuff */ GstClockTime prev_rtptime; GstClockTime prev_rtcptime;