session: delay allocation of internal source

Allocate the internal source when we receive a caps with the SSRC or when we see
a buffer with the SSRC.
This commit is contained in:
Wim Taymans 2013-07-26 10:24:22 +02:00
parent e0a1ce1291
commit abc90da1dc
2 changed files with 76 additions and 44 deletions

View file

@ -463,8 +463,6 @@ rtp_session_init (RTPSession * sess)
{
gint i;
gchar *str;
guint32 ssrc;
gboolean created;
g_mutex_init (&sess->lock);
sess->key = g_random_int ();
@ -511,9 +509,8 @@ rtp_session_init (RTPSession * sess)
gst_structure_set (sess->sdes, "tool", G_TYPE_STRING, "GStreamer", NULL);
/* create an active SSRC for this session manager */
ssrc = rtp_session_create_new_ssrc (sess);
sess->source = obtain_internal_source (sess, ssrc, &created);
/* this is the SSRC we suggest */
sess->suggested_ssrc = rtp_session_create_new_ssrc (sess);
sess->first_rtcp = TRUE;
sess->next_rtcp_check_time = GST_CLOCK_TIME_NONE;
@ -540,7 +537,6 @@ rtp_session_finalize (GObject * object)
for (i = 0; i < 32; i++)
g_hash_table_destroy (sess->ssrcs[i]);
g_object_unref (sess->source);
g_mutex_clear (&sess->lock);
G_OBJECT_CLASS (rtp_session_parent_class)->finalize (object);
@ -655,7 +651,8 @@ rtp_session_get_property (GObject * object, guint prop_id,
g_value_set_uint (value, rtp_session_suggest_ssrc (sess));
break;
case PROP_INTERNAL_SOURCE:
g_value_set_object (value, sess->source);
/* FIXME, return a random source */
g_value_set_object (value, NULL);
break;
case PROP_BANDWIDTH:
g_value_set_double (value, sess->bandwidth);
@ -1389,26 +1386,6 @@ obtain_internal_source (RTPSession * sess, guint32 ssrc, gboolean * created)
return source;
}
static void
rtp_session_set_internal_ssrc (RTPSession * sess, guint32 ssrc)
{
if (ssrc != sess->source->ssrc) {
g_hash_table_steal (sess->ssrcs[sess->mask_idx],
GINT_TO_POINTER (sess->source->ssrc));
GST_DEBUG ("setting internal SSRC to %08x", ssrc);
/* After this call, any receiver of the old SSRC either in RTP or RTCP
* packets will timeout on the old SSRC, we could potentially schedule a
* BYE RTCP for the old SSRC... */
sess->source->ssrc = ssrc;
rtp_source_reset (sess->source);
/* rehash with the new SSRC */
g_hash_table_insert (sess->ssrcs[sess->mask_idx],
GINT_TO_POINTER (sess->source->ssrc), sess->source);
}
}
/**
* rtp_session_suggest_ssrc:
* @sess: a #RTPSession
@ -1670,9 +1647,13 @@ rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer,
gst_rtp_buffer_unmap (&rtp);
RTP_SESSION_LOCK (sess);
#if 0
/* FIXME, we should simply not update any stats on the BYE
* internal sources */
/* ignore more RTP packets when we left the session */
if (sess->source->marked_bye)
goto ignore;
#endif
/* update arrival stats */
update_arrival_stats (sess, &arrival, TRUE, buffer, current_time,
@ -1747,6 +1728,7 @@ invalid_packet:
GST_DEBUG ("invalid RTP packet received");
return GST_FLOW_OK;
}
#if 0
ignore:
{
RTP_SESSION_UNLOCK (sess);
@ -1754,6 +1736,7 @@ ignore:
GST_DEBUG ("ignoring RTP packet because we are leaving");
return GST_FLOW_OK;
}
#endif
collision:
{
RTP_SESSION_UNLOCK (sess);
@ -1775,16 +1758,23 @@ rtp_session_process_rb (RTPSession * sess, RTPSource * source,
guint32 ssrc, exthighestseq, jitter, lsr, dlsr;
guint8 fractionlost;
gint32 packetslost;
RTPSource *src;
gst_rtcp_packet_get_rb (packet, i, &ssrc, &fractionlost,
&packetslost, &exthighestseq, &jitter, &lsr, &dlsr);
GST_DEBUG ("RB %d: SSRC %08x, jitter %" G_GUINT32_FORMAT, i, ssrc, jitter);
if (ssrc == sess->source->ssrc) {
/* find our own source */
src = find_source (sess, ssrc);
if (src == NULL)
continue;
if (src->internal) {
/* only deal with report blocks for our session, we update the stats of
* the sender of the RTCP message. We could also compare our stats against
* the other sender to see if we are better or worse. */
/* FIXME, need to keep track who the RB block is from */
rtp_source_process_rb (source, arrival->ntpnstime, fractionlost,
packetslost, exthighestseq, jitter, lsr, dlsr);
}
@ -2146,19 +2136,17 @@ rtp_session_process_fir (RTPSession * sess, guint32 sender_ssrc,
if (!src && sender_ssrc == 1) {
GHashTableIter iter;
if (sess->stats.sender_sources >
RTP_SOURCE_IS_SENDER (sess->source) ? 2 : 1)
/* we can't find the source if there are multiple */
if (sess->stats.sender_sources > sess->stats.internal_sender_sources + 1)
return;
g_hash_table_iter_init (&iter, sess->ssrcs[sess->mask_idx]);
while (g_hash_table_iter_next (&iter, NULL, (gpointer *) & src)) {
if (src != sess->source && rtp_source_is_sender (src))
if (!src->internal && rtp_source_is_sender (src))
break;
src = NULL;
}
}
if (!src)
return;
@ -2283,8 +2271,11 @@ rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer,
update_arrival_stats (sess, &arrival, FALSE, buffer, current_time, -1,
ntpnstime);
#if 0
/* FIXME, simply ignore RTCP for iternal sources with BYE */
if (sess->source->sent_bye)
goto ignore;
#endif
/* start processing the compound packet */
gst_rtcp_buffer_map (buffer, GST_MAP_READ, &rtcp);
@ -2369,6 +2360,7 @@ invalid_packet:
gst_buffer_unref (buffer);
return GST_FLOW_OK;
}
#if 0
ignore:
{
RTP_SESSION_UNLOCK (sess);
@ -2377,6 +2369,7 @@ ignore:
GST_DEBUG ("ignoring RTCP packet because we left");
return GST_FLOW_OK;
}
#endif
}
/**
@ -2399,12 +2392,18 @@ rtp_session_update_send_caps (RTPSession * sess, GstCaps * caps)
s = gst_caps_get_structure (caps, 0);
if (gst_structure_get_uint (s, "ssrc", &ssrc))
rtp_session_set_internal_ssrc (sess, ssrc);
if (gst_structure_get_uint (s, "ssrc", &ssrc)) {
RTPSource *source;
gboolean created;
RTP_SESSION_LOCK (sess);
rtp_source_update_caps (sess->source, caps);
RTP_SESSION_UNLOCK (sess);
RTP_SESSION_LOCK (sess);
source = obtain_internal_source (sess, ssrc, &created);
if (source) {
rtp_source_update_caps (source, caps);
g_object_unref (source);
}
RTP_SESSION_UNLOCK (sess);
}
}
/**
@ -2428,14 +2427,36 @@ rtp_session_send_rtp (RTPSession * sess, gpointer data, gboolean is_list,
RTPSource *source;
gboolean prevsender;
guint64 oldrate;
GstBuffer *buffer;
GstRTPBuffer rtp = { NULL };
guint32 ssrc;
gboolean created;
g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
g_return_val_if_fail (is_list || GST_IS_BUFFER (data), GST_FLOW_ERROR);
GST_LOG ("received RTP %s for sending", is_list ? "list" : "packet");
if (is_list) {
GstBufferList *list = GST_BUFFER_LIST_CAST (data);
buffer = gst_buffer_list_get (list, 0);
if (!buffer)
goto no_buffer;
} else {
buffer = GST_BUFFER_CAST (data);
}
if (!gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp))
goto invalid_packet;
/* get SSRC and look up in session database */
ssrc = gst_rtp_buffer_get_ssrc (&rtp);
gst_rtp_buffer_unmap (&rtp);
RTP_SESSION_LOCK (sess);
source = sess->source;
source = obtain_internal_source (sess, ssrc, &created);
/* update last activity */
source->last_rtp_activity = current_time;
@ -2454,7 +2475,22 @@ rtp_session_send_rtp (RTPSession * sess, gpointer data, gboolean is_list,
sess->recalc_bandwidth = TRUE;
RTP_SESSION_UNLOCK (sess);
g_object_unref (source);
return result;
invalid_packet:
{
gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
GST_DEBUG ("invalid RTP packet received");
return GST_FLOW_OK;
}
no_buffer:
{
gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
GST_DEBUG ("no buffer in list");
return GST_FLOW_OK;
}
}
static void
@ -2640,10 +2676,7 @@ rtp_session_next_timeout (RTPSession * sess, GstClockTime current_time)
}
if (sess->scheduled_bye) {
if (sess->source->sent_bye) {
GST_DEBUG ("we sent BYE already");
interval = GST_CLOCK_TIME_NONE;
} else if (sess->stats.active_sources >= 50) {
if (sess->stats.active_sources >= 50) {
GST_DEBUG ("reconsider BYE, more than 50 sources");
/* reconsider BYE if members >= 50 */
interval = calculate_rtcp_interval (sess, FALSE, TRUE);

View file

@ -198,7 +198,6 @@ struct _RTPSession {
guint rtcp_rr_bandwidth;
guint rtcp_rs_bandwidth;
RTPSource *source;
guint32 suggested_ssrc;
/* for sender/receiver counting */