gst/tcp/gstmultifdsink.*: Implement stubbed out properties unit-type, units-soft-max, units-max, to allow specifying ...

Original commit message from CVS:
* gst/tcp/gstmultifdsink.c: (gst_sync_method_get_type),
(gst_multi_fd_sink_class_init), (get_buffers_max), (find_limits),
(gst_multi_fd_sink_recover_client),
(gst_multi_fd_sink_queue_buffer), (gst_multi_fd_sink_set_property),
(gst_multi_fd_sink_get_property):
* gst/tcp/gstmultifdsink.h:
Implement stubbed out properties unit-type, units-soft-max,
units-max, to allow specifying maximum sizes in units other than
buffers.
Fixes #355935
This commit is contained in:
Michael Smith 2006-09-19 11:31:06 +00:00
parent 3edec5923c
commit 07e516ea42
3 changed files with 108 additions and 22 deletions

View file

@ -1,3 +1,16 @@
2006-09-19 Michael Smith <msmith@fluendo.com>
* gst/tcp/gstmultifdsink.c: (gst_sync_method_get_type),
(gst_multi_fd_sink_class_init), (get_buffers_max), (find_limits),
(gst_multi_fd_sink_recover_client),
(gst_multi_fd_sink_queue_buffer), (gst_multi_fd_sink_set_property),
(gst_multi_fd_sink_get_property):
* gst/tcp/gstmultifdsink.h:
Implement stubbed out properties unit-type, units-soft-max,
units-max, to allow specifying maximum sizes in units other than
buffers.
Fixes #355935
2006-09-19 Wim Taymans <wim@fluendo.com>
* gst-libs/gst/riff/riff-media.c: (gst_riff_create_audio_caps),

View file

@ -404,20 +404,18 @@ gst_multi_fd_sink_class_init (GstMultiFdSinkClass * klass)
"min number of buffers to queue (-1 = as few as possible)", -1,
G_MAXINT, DEFAULT_BUFFERS_MIN, G_PARAM_READWRITE));
#if NOT_IMPLEMENTED
g_object_class_install_property (gobject_class, PROP_UNIT_TYPE,
g_param_spec_enum ("unit-type", "Units type",
"The unit to measure the max/soft-max/queued properties",
GST_TYPE_UNIT_TYPE, DEFAULT_UNIT_TYPE, G_PARAM_READWRITE));
g_object_class_install_property (gobject_class, PROP_UNITS_MAX,
g_param_spec_int ("units-max", "Units max",
"max number of units to queue (-1 = no limit)", -1, G_MAXINT,
g_param_spec_int64 ("units-max", "Units max",
"max number of units to queue (-1 = no limit)", -1, G_MAXINT64,
DEFAULT_UNITS_MAX, G_PARAM_READWRITE));
g_object_class_install_property (gobject_class, PROP_UNITS_SOFT_MAX,
g_param_spec_int ("units-soft-max", "Units soft max",
g_param_spec_int64 ("units-soft-max", "Units soft max",
"Recover client when going over this limit (-1 = no limit)", -1,
G_MAXINT, DEFAULT_UNITS_SOFT_MAX, G_PARAM_READWRITE));
#endif
G_MAXINT64, DEFAULT_UNITS_SOFT_MAX, G_PARAM_READWRITE));
g_object_class_install_property (gobject_class, PROP_BUFFERS_QUEUED,
g_param_spec_uint ("buffers-queued", "Buffers queued",
@ -1273,6 +1271,63 @@ find_syncframe (GstMultiFdSink * sink, gint idx, gint direction)
#define find_next_syncframe(s,i) find_syncframe(s,i,1)
#define find_prev_syncframe(s,i) find_syncframe(s,i,-1)
/* Get the number of buffers from the buffer queue needed to satisfy
* the maximum max in the configured units.
* If units are not BUFFERS, and there are insufficient buffers in the
* queue to satify the limit, return len(queue) + 1 */
static gint
get_buffers_max (GstMultiFdSink * sink, gint64 max)
{
switch (sink->unit_type) {
case GST_UNIT_TYPE_BUFFERS:
return max;
case GST_UNIT_TYPE_TIME:
{
GstBuffer *buf;
int i;
int len;
gint64 diff;
GstClockTime first = -1;
len = sink->bufqueue->len;
for (i = 0; i < len; i++) {
buf = g_array_index (sink->bufqueue, GstBuffer *, i);
if (GST_BUFFER_TIMESTAMP_IS_VALID (buf)) {
if (first == -1)
first = GST_BUFFER_TIMESTAMP (buf);
diff = first - GST_BUFFER_TIMESTAMP (buf);
if (diff > max)
return i + 1;
}
}
return len + 1;
}
case GST_UNIT_TYPE_BYTES:
{
GstBuffer *buf;
int i;
int len;
gint acc = 0;
len = sink->bufqueue->len;
for (i = 0; i < len; i++) {
buf = g_array_index (sink->bufqueue, GstBuffer *, i);
acc += GST_BUFFER_SIZE (buf);
if (acc > max)
return i + 1;
}
return len + 1;
}
default:
return max;
}
}
/* find the positions in the buffer queue where *_min and *_max
* is satisfied
*/
@ -1348,6 +1403,7 @@ find_limits (GstMultiFdSink * sink,
/* take timestamp and save for the base first timestamp */
if ((time = GST_BUFFER_TIMESTAMP (buf)) != -1) {
GST_DEBUG_OBJECT (sink, "Ts %lld on buffer", time);
if (first == -1)
first = time;
@ -1358,6 +1414,8 @@ find_limits (GstMultiFdSink * sink,
time_min = -1;
if (time_max != -1 && first - time >= time_max)
max_hit = TRUE;
} else {
GST_DEBUG_OBJECT (sink, "No timestamp on buffer");
}
/* time is OK or unknown, check and increase if not enough bytes */
if (bytes_min != -1) {
@ -1848,12 +1906,13 @@ gst_multi_fd_sink_recover_client (GstMultiFdSink * sink, GstTCPClient * client)
break;
case GST_RECOVER_POLICY_RESYNC_SOFT_LIMIT:
/* move to beginning of soft max */
newbufpos = sink->units_soft_max;
newbufpos = get_buffers_max (sink, sink->units_soft_max);
break;
case GST_RECOVER_POLICY_RESYNC_KEYFRAME:
/* find keyframe in buffers, we search backwards to find the
* closest keyframe relative to what this client already received. */
newbufpos = MIN (sink->bufqueue->len - 1, sink->units_soft_max - 1);
newbufpos = MIN (sink->bufqueue->len - 1,
get_buffers_max (sink, sink->units_soft_max) - 1);
while (newbufpos >= 0) {
GstBuffer *buf;
@ -1868,7 +1927,7 @@ gst_multi_fd_sink_recover_client (GstMultiFdSink * sink, GstTCPClient * client)
break;
default:
/* unknown recovery procedure */
newbufpos = sink->units_soft_max;
newbufpos = get_buffers_max (sink, sink->units_soft_max);
break;
}
return newbufpos;
@ -1902,6 +1961,7 @@ gst_multi_fd_sink_queue_buffer (GstMultiFdSink * sink, GstBuffer * buf)
gint i;
GTimeVal nowtv;
GstClockTime now;
gint max_buffers, soft_max_buffers;
g_get_current_time (&nowtv);
now = GST_TIMEVAL_TO_TIME (nowtv);
@ -1911,6 +1971,18 @@ gst_multi_fd_sink_queue_buffer (GstMultiFdSink * sink, GstBuffer * buf)
g_array_prepend_val (sink->bufqueue, buf);
queuelen = sink->bufqueue->len;
if (sink->units_max > 0)
max_buffers = get_buffers_max (sink, sink->units_max);
else
max_buffers = -1;
if (sink->units_soft_max > 0)
soft_max_buffers = get_buffers_max (sink, sink->units_soft_max);
else
soft_max_buffers = -1;
GST_LOG_OBJECT (sink, "Using max %d, softmax %d", max_buffers,
soft_max_buffers);
/* then loop over the clients and update the positions */
max_buffer_usage = 0;
for (clients = sink->clients; clients; clients = next) {
@ -1923,7 +1995,7 @@ gst_multi_fd_sink_queue_buffer (GstMultiFdSink * sink, GstBuffer * buf)
GST_LOG_OBJECT (sink, "[fd %5d] client %p at position %d",
client->fd.fd, client, client->bufpos);
/* check soft max if needed, recover client */
if (sink->units_soft_max > 0 && client->bufpos >= sink->units_soft_max) {
if (soft_max_buffers > 0 && client->bufpos >= soft_max_buffers) {
gint newpos;
newpos = gst_multi_fd_sink_recover_client (sink, client);
@ -1939,7 +2011,7 @@ gst_multi_fd_sink_queue_buffer (GstMultiFdSink * sink, GstBuffer * buf)
}
}
/* check hard max and timeout, remove client */
if ((sink->units_max > 0 && client->bufpos >= sink->units_max) ||
if ((max_buffers > 0 && client->bufpos >= max_buffers) ||
(sink->timeout > 0
&& now - client->last_activity_time > sink->timeout)) {
/* remove client */
@ -1984,16 +2056,17 @@ gst_multi_fd_sink_queue_buffer (GstMultiFdSink * sink, GstBuffer * buf)
}
/* now look for sync points and make sure there is at least one
* sync point in the queue. We only do this if the LATEST_KEYFRAME
* mode is selected */
if (sink->def_sync_method == GST_SYNC_METHOD_LATEST_KEYFRAME) {
* sync point in the queue. We only do this if the LATEST_KEYFRAME or
* BURST_KEYFRAME mode is selected */
if (sink->def_sync_method == GST_SYNC_METHOD_LATEST_KEYFRAME ||
sink->def_sync_method == GST_SYNC_METHOD_BURST_KEYFRAME) {
/* no point in searching beyond the queue length */
gint limit = queuelen;
GstBuffer *buf;
/* no point in searching beyond the soft-max if any. */
if (sink->units_soft_max > 0) {
limit = MIN (limit, sink->units_soft_max);
if (soft_max_buffers) {
limit = MIN (limit, soft_max_buffers);
}
GST_LOG_OBJECT (sink, "extending queue to include sync point, now at %d",
max_buffer_usage);
@ -2336,10 +2409,10 @@ gst_multi_fd_sink_set_property (GObject * object, guint prop_id,
multifdsink->unit_type = g_value_get_enum (value);
break;
case PROP_UNITS_MAX:
multifdsink->units_max = g_value_get_int (value);
multifdsink->units_max = g_value_get_int64 (value);
break;
case PROP_UNITS_SOFT_MAX:
multifdsink->units_soft_max = g_value_get_int (value);
multifdsink->units_soft_max = g_value_get_int64 (value);
break;
case PROP_RECOVER_POLICY:
multifdsink->recover_policy = g_value_get_enum (value);
@ -2406,10 +2479,10 @@ gst_multi_fd_sink_get_property (GObject * object, guint prop_id, GValue * value,
g_value_set_enum (value, multifdsink->unit_type);
break;
case PROP_UNITS_MAX:
g_value_set_int (value, multifdsink->units_max);
g_value_set_int64 (value, multifdsink->units_max);
break;
case PROP_UNITS_SOFT_MAX:
g_value_set_int (value, multifdsink->units_soft_max);
g_value_set_int64 (value, multifdsink->units_soft_max);
break;
case PROP_RECOVER_POLICY:
g_value_set_enum (value, multifdsink->recover_policy);

View file

@ -213,8 +213,8 @@ struct _GstMultiFdSink {
/* these values are used to check if a client is reading fast
* enough and to control receovery */
GstUnitType unit_type;/* the type of the units */
gint units_max; /* max units to queue for a client */
gint units_soft_max; /* max units a client can lag before recovery starts */
gint64 units_max; /* max units to queue for a client */
gint64 units_soft_max; /* max units a client can lag before recovery starts */
GstRecoverPolicy recover_policy;
GstClockTime timeout; /* max amount of nanoseconds to remain idle */