diff --git a/ChangeLog b/ChangeLog index 1fab94d431..2a327666ea 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,16 @@ +2006-09-19 Michael Smith + + * gst/tcp/gstmultifdsink.c: (gst_sync_method_get_type), + (gst_multi_fd_sink_class_init), (get_buffers_max), (find_limits), + (gst_multi_fd_sink_recover_client), + (gst_multi_fd_sink_queue_buffer), (gst_multi_fd_sink_set_property), + (gst_multi_fd_sink_get_property): + * gst/tcp/gstmultifdsink.h: + Implement stubbed out properties unit-type, units-soft-max, + units-max, to allow specifying maximum sizes in units other than + buffers. + Fixes #355935 + 2006-09-19 Wim Taymans * gst-libs/gst/riff/riff-media.c: (gst_riff_create_audio_caps), diff --git a/gst/tcp/gstmultifdsink.c b/gst/tcp/gstmultifdsink.c index 8304c5d922..8ff1d2f5b4 100644 --- a/gst/tcp/gstmultifdsink.c +++ b/gst/tcp/gstmultifdsink.c @@ -404,20 +404,18 @@ gst_multi_fd_sink_class_init (GstMultiFdSinkClass * klass) "min number of buffers to queue (-1 = as few as possible)", -1, G_MAXINT, DEFAULT_BUFFERS_MIN, G_PARAM_READWRITE)); -#if NOT_IMPLEMENTED g_object_class_install_property (gobject_class, PROP_UNIT_TYPE, g_param_spec_enum ("unit-type", "Units type", "The unit to measure the max/soft-max/queued properties", GST_TYPE_UNIT_TYPE, DEFAULT_UNIT_TYPE, G_PARAM_READWRITE)); g_object_class_install_property (gobject_class, PROP_UNITS_MAX, - g_param_spec_int ("units-max", "Units max", - "max number of units to queue (-1 = no limit)", -1, G_MAXINT, + g_param_spec_int64 ("units-max", "Units max", + "max number of units to queue (-1 = no limit)", -1, G_MAXINT64, DEFAULT_UNITS_MAX, G_PARAM_READWRITE)); g_object_class_install_property (gobject_class, PROP_UNITS_SOFT_MAX, - g_param_spec_int ("units-soft-max", "Units soft max", + g_param_spec_int64 ("units-soft-max", "Units soft max", "Recover client when going over this limit (-1 = no limit)", -1, - G_MAXINT, DEFAULT_UNITS_SOFT_MAX, G_PARAM_READWRITE)); -#endif + G_MAXINT64, DEFAULT_UNITS_SOFT_MAX, G_PARAM_READWRITE)); g_object_class_install_property (gobject_class, PROP_BUFFERS_QUEUED, g_param_spec_uint ("buffers-queued", "Buffers queued", @@ -1273,6 +1271,63 @@ find_syncframe (GstMultiFdSink * sink, gint idx, gint direction) #define find_next_syncframe(s,i) find_syncframe(s,i,1) #define find_prev_syncframe(s,i) find_syncframe(s,i,-1) +/* Get the number of buffers from the buffer queue needed to satisfy + * the maximum max in the configured units. + * If units are not BUFFERS, and there are insufficient buffers in the + * queue to satify the limit, return len(queue) + 1 */ +static gint +get_buffers_max (GstMultiFdSink * sink, gint64 max) +{ + switch (sink->unit_type) { + case GST_UNIT_TYPE_BUFFERS: + return max; + case GST_UNIT_TYPE_TIME: + { + GstBuffer *buf; + int i; + int len; + gint64 diff; + GstClockTime first = -1; + + len = sink->bufqueue->len; + + for (i = 0; i < len; i++) { + buf = g_array_index (sink->bufqueue, GstBuffer *, i); + if (GST_BUFFER_TIMESTAMP_IS_VALID (buf)) { + if (first == -1) + first = GST_BUFFER_TIMESTAMP (buf); + + diff = first - GST_BUFFER_TIMESTAMP (buf); + + if (diff > max) + return i + 1; + } + } + return len + 1; + } + case GST_UNIT_TYPE_BYTES: + { + GstBuffer *buf; + int i; + int len; + gint acc = 0; + + len = sink->bufqueue->len; + + for (i = 0; i < len; i++) { + buf = g_array_index (sink->bufqueue, GstBuffer *, i); + acc += GST_BUFFER_SIZE (buf); + + if (acc > max) + return i + 1; + } + return len + 1; + } + default: + return max; + } +} + /* find the positions in the buffer queue where *_min and *_max * is satisfied */ @@ -1348,6 +1403,7 @@ find_limits (GstMultiFdSink * sink, /* take timestamp and save for the base first timestamp */ if ((time = GST_BUFFER_TIMESTAMP (buf)) != -1) { + GST_DEBUG_OBJECT (sink, "Ts %lld on buffer", time); if (first == -1) first = time; @@ -1358,6 +1414,8 @@ find_limits (GstMultiFdSink * sink, time_min = -1; if (time_max != -1 && first - time >= time_max) max_hit = TRUE; + } else { + GST_DEBUG_OBJECT (sink, "No timestamp on buffer"); } /* time is OK or unknown, check and increase if not enough bytes */ if (bytes_min != -1) { @@ -1848,12 +1906,13 @@ gst_multi_fd_sink_recover_client (GstMultiFdSink * sink, GstTCPClient * client) break; case GST_RECOVER_POLICY_RESYNC_SOFT_LIMIT: /* move to beginning of soft max */ - newbufpos = sink->units_soft_max; + newbufpos = get_buffers_max (sink, sink->units_soft_max); break; case GST_RECOVER_POLICY_RESYNC_KEYFRAME: /* find keyframe in buffers, we search backwards to find the * closest keyframe relative to what this client already received. */ - newbufpos = MIN (sink->bufqueue->len - 1, sink->units_soft_max - 1); + newbufpos = MIN (sink->bufqueue->len - 1, + get_buffers_max (sink, sink->units_soft_max) - 1); while (newbufpos >= 0) { GstBuffer *buf; @@ -1868,7 +1927,7 @@ gst_multi_fd_sink_recover_client (GstMultiFdSink * sink, GstTCPClient * client) break; default: /* unknown recovery procedure */ - newbufpos = sink->units_soft_max; + newbufpos = get_buffers_max (sink, sink->units_soft_max); break; } return newbufpos; @@ -1902,6 +1961,7 @@ gst_multi_fd_sink_queue_buffer (GstMultiFdSink * sink, GstBuffer * buf) gint i; GTimeVal nowtv; GstClockTime now; + gint max_buffers, soft_max_buffers; g_get_current_time (&nowtv); now = GST_TIMEVAL_TO_TIME (nowtv); @@ -1911,6 +1971,18 @@ gst_multi_fd_sink_queue_buffer (GstMultiFdSink * sink, GstBuffer * buf) g_array_prepend_val (sink->bufqueue, buf); queuelen = sink->bufqueue->len; + if (sink->units_max > 0) + max_buffers = get_buffers_max (sink, sink->units_max); + else + max_buffers = -1; + + if (sink->units_soft_max > 0) + soft_max_buffers = get_buffers_max (sink, sink->units_soft_max); + else + soft_max_buffers = -1; + GST_LOG_OBJECT (sink, "Using max %d, softmax %d", max_buffers, + soft_max_buffers); + /* then loop over the clients and update the positions */ max_buffer_usage = 0; for (clients = sink->clients; clients; clients = next) { @@ -1923,7 +1995,7 @@ gst_multi_fd_sink_queue_buffer (GstMultiFdSink * sink, GstBuffer * buf) GST_LOG_OBJECT (sink, "[fd %5d] client %p at position %d", client->fd.fd, client, client->bufpos); /* check soft max if needed, recover client */ - if (sink->units_soft_max > 0 && client->bufpos >= sink->units_soft_max) { + if (soft_max_buffers > 0 && client->bufpos >= soft_max_buffers) { gint newpos; newpos = gst_multi_fd_sink_recover_client (sink, client); @@ -1939,7 +2011,7 @@ gst_multi_fd_sink_queue_buffer (GstMultiFdSink * sink, GstBuffer * buf) } } /* check hard max and timeout, remove client */ - if ((sink->units_max > 0 && client->bufpos >= sink->units_max) || + if ((max_buffers > 0 && client->bufpos >= max_buffers) || (sink->timeout > 0 && now - client->last_activity_time > sink->timeout)) { /* remove client */ @@ -1984,16 +2056,17 @@ gst_multi_fd_sink_queue_buffer (GstMultiFdSink * sink, GstBuffer * buf) } /* now look for sync points and make sure there is at least one - * sync point in the queue. We only do this if the LATEST_KEYFRAME - * mode is selected */ - if (sink->def_sync_method == GST_SYNC_METHOD_LATEST_KEYFRAME) { + * sync point in the queue. We only do this if the LATEST_KEYFRAME or + * BURST_KEYFRAME mode is selected */ + if (sink->def_sync_method == GST_SYNC_METHOD_LATEST_KEYFRAME || + sink->def_sync_method == GST_SYNC_METHOD_BURST_KEYFRAME) { /* no point in searching beyond the queue length */ gint limit = queuelen; GstBuffer *buf; /* no point in searching beyond the soft-max if any. */ - if (sink->units_soft_max > 0) { - limit = MIN (limit, sink->units_soft_max); + if (soft_max_buffers) { + limit = MIN (limit, soft_max_buffers); } GST_LOG_OBJECT (sink, "extending queue to include sync point, now at %d", max_buffer_usage); @@ -2336,10 +2409,10 @@ gst_multi_fd_sink_set_property (GObject * object, guint prop_id, multifdsink->unit_type = g_value_get_enum (value); break; case PROP_UNITS_MAX: - multifdsink->units_max = g_value_get_int (value); + multifdsink->units_max = g_value_get_int64 (value); break; case PROP_UNITS_SOFT_MAX: - multifdsink->units_soft_max = g_value_get_int (value); + multifdsink->units_soft_max = g_value_get_int64 (value); break; case PROP_RECOVER_POLICY: multifdsink->recover_policy = g_value_get_enum (value); @@ -2406,10 +2479,10 @@ gst_multi_fd_sink_get_property (GObject * object, guint prop_id, GValue * value, g_value_set_enum (value, multifdsink->unit_type); break; case PROP_UNITS_MAX: - g_value_set_int (value, multifdsink->units_max); + g_value_set_int64 (value, multifdsink->units_max); break; case PROP_UNITS_SOFT_MAX: - g_value_set_int (value, multifdsink->units_soft_max); + g_value_set_int64 (value, multifdsink->units_soft_max); break; case PROP_RECOVER_POLICY: g_value_set_enum (value, multifdsink->recover_policy); diff --git a/gst/tcp/gstmultifdsink.h b/gst/tcp/gstmultifdsink.h index 7533424597..be4d3a4cb6 100644 --- a/gst/tcp/gstmultifdsink.h +++ b/gst/tcp/gstmultifdsink.h @@ -213,8 +213,8 @@ struct _GstMultiFdSink { /* these values are used to check if a client is reading fast * enough and to control receovery */ GstUnitType unit_type;/* the type of the units */ - gint units_max; /* max units to queue for a client */ - gint units_soft_max; /* max units a client can lag before recovery starts */ + gint64 units_max; /* max units to queue for a client */ + gint64 units_soft_max; /* max units a client can lag before recovery starts */ GstRecoverPolicy recover_policy; GstClockTime timeout; /* max amount of nanoseconds to remain idle */