gst/tcp/: Added burst on connect sync_method, deprecated sync_clients, streamlined the sync code some more.

Original commit message from CVS:
* gst/tcp/.cvsignore:
* gst/tcp/gstmultifdsink.c: (gst_sync_method_get_type),
(gst_multifdsink_class_init), (gst_multifdsink_init),
(gst_multifdsink_add), (gst_multifdsink_remove),
(gst_multifdsink_remove_client_link), (is_sync_frame),
(gst_multifdsink_new_client),
(gst_multifdsink_handle_client_write),
(gst_multifdsink_recover_client), (gst_multifdsink_queue_buffer),
(gst_multifdsink_handle_clients), (gst_multifdsink_set_property),
(gst_multifdsink_get_property):
* gst/tcp/gstmultifdsink.h:
Added burst on connect sync_method, deprecated sync_clients,
streamlined the sync code some more.
This commit is contained in:
Wim Taymans 2004-10-29 11:10:38 +00:00
parent 3b2fd2e665
commit 9f38ed64f5
4 changed files with 177 additions and 32 deletions

View file

@ -1,3 +1,19 @@
2004-10-29 Wim Taymans <wim@fluendo.com>
* gst/tcp/.cvsignore:
* gst/tcp/gstmultifdsink.c: (gst_sync_method_get_type),
(gst_multifdsink_class_init), (gst_multifdsink_init),
(gst_multifdsink_add), (gst_multifdsink_remove),
(gst_multifdsink_remove_client_link), (is_sync_frame),
(gst_multifdsink_new_client),
(gst_multifdsink_handle_client_write),
(gst_multifdsink_recover_client), (gst_multifdsink_queue_buffer),
(gst_multifdsink_handle_clients), (gst_multifdsink_set_property),
(gst_multifdsink_get_property):
* gst/tcp/gstmultifdsink.h:
Added burst on connect sync_method, deprecated sync_clients,
streamlined the sync code some more.
2004-10-29 Ronald S. Bultje <rbultje@ronald.bitfreak.net>
* gst/playback/gstplaybasebin.c: (thread_error), (setup_source),

1
gst/tcp/.gitignore vendored
View file

@ -2,3 +2,4 @@ gsttcp-enumtypes.c
gsttcp-enumtypes.h
gsttcp-marshal.c
gsttcp-marshal.h
fdsetstress

View file

@ -95,7 +95,7 @@ enum
#define DEFAULT_UNITS_SOFT_MAX -1
#define DEFAULT_RECOVER_POLICY GST_RECOVER_POLICY_NONE
#define DEFAULT_TIMEOUT 0
#define DEFAULT_SYNC_CLIENTS FALSE
#define DEFAULT_SYNC_METHOD GST_SYNC_METHOD_NONE
enum
{
@ -115,7 +115,8 @@ enum
ARG_RECOVER_POLICY,
ARG_TIMEOUT,
ARG_SYNC_CLIENTS,
ARG_SYNC_CLIENTS, /* deprecated */
ARG_SYNC_METHOD,
ARG_BYTES_TO_SERVE,
ARG_BYTES_SERVED,
};
@ -144,6 +145,27 @@ gst_recover_policy_get_type (void)
return recover_policy_type;
}
#define GST_TYPE_SYNC_METHOD (gst_sync_method_get_type())
static GType
gst_sync_method_get_type (void)
{
static GType sync_method_type = 0;
static GEnumValue sync_method[] = {
{GST_SYNC_METHOD_NONE, "GST_SYNC_METHOD_NONE",
"Serve new client the latest buffer"},
{GST_SYNC_METHOD_WAIT, "GST_SYNC_METHOD_WAIT",
"Make the new client wait for the next keyframe"},
{GST_SYNC_METHOD_BURST, "GST_SYNC_METHOD_BURST",
"Serve the new client the last keyframe, aka burst"},
{0, NULL, NULL},
};
if (!sync_method_type) {
sync_method_type = g_enum_register_static ("GstTCPSyncMethod", sync_method);
}
return sync_method_type;
}
#if NOT_IMPLEMENTED
#define GST_TYPE_UNIT_TYPE (gst_unit_type_get_type())
static GType
@ -310,8 +332,12 @@ gst_multifdsink_class_init (GstMultiFdSinkClass * klass)
0, G_MAXUINT64, DEFAULT_TIMEOUT, G_PARAM_READWRITE));
g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_SYNC_CLIENTS,
g_param_spec_boolean ("sync-clients", "Sync clients",
"Sync clients to a keyframe",
DEFAULT_SYNC_CLIENTS, G_PARAM_READWRITE));
"(DEPRECATED) Sync clients to a keyframe",
DEFAULT_SYNC_METHOD == GST_SYNC_METHOD_WAIT, G_PARAM_READWRITE));
g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_SYNC_METHOD,
g_param_spec_enum ("sync-method", "Sync Method",
"How to sync new clients to the stream",
GST_TYPE_SYNC_METHOD, DEFAULT_SYNC_METHOD, G_PARAM_READWRITE));
g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BYTES_TO_SERVE,
g_param_spec_uint64 ("bytes-to-serve", "Bytes to serve",
"Number of bytes received to serve to clients", 0, G_MAXUINT64, 0,
@ -386,7 +412,7 @@ gst_multifdsink_init (GstMultiFdSink * this)
this->recover_policy = DEFAULT_RECOVER_POLICY;
this->timeout = DEFAULT_TIMEOUT;
this->sync_clients = DEFAULT_SYNC_CLIENTS;
this->sync_method = DEFAULT_SYNC_METHOD;
}
void
@ -788,27 +814,84 @@ gst_multifdsink_client_queue_buffer (GstMultiFdSink * sink,
static gint
gst_multifdsink_new_client (GstMultiFdSink * sink, GstTCPClient * client)
{
if (sink->sync_clients) {
GstBuffer *buf;
gint result;
GST_LOG_OBJECT (sink, "New client on fd %d, bufpos %d",
client->fd.fd, client->bufpos);
switch (sink->sync_method) {
case GST_SYNC_METHOD_WAIT:
{
/* if the buffer at the head of the queue is a sync point we can proceed,
* else we need to skip the buffer and wait for a new one */
GST_LOG_OBJECT (sink,
"New client on fd %d, bufpos %d, waiting for keyframe", client->fd.fd,
client->bufpos);
if (client->bufpos < 0)
return -1;
/* the client is not yet alligned to a buffer */
if (client->bufpos < 0) {
result = -1;
} else {
GstBuffer *buf;
gint i;
buf = g_array_index (sink->bufqueue, GstBuffer *, client->bufpos);
if (is_sync_frame (sink, buf)) {
GST_LOG_OBJECT (sink, "New client on fd %d found sync", client->fd.fd);
return client->bufpos;
} else {
GST_LOG_OBJECT (sink, "New client on fd %d skipping buffer",
client->fd.fd);
client->bufpos--;
return -1;
for (i = client->bufpos; i >= 0; i--) {
/* get the buffer for the client */
buf = g_array_index (sink->bufqueue, GstBuffer *, i);
if (is_sync_frame (sink, buf)) {
GST_LOG_OBJECT (sink, "New client on fd %d found sync",
client->fd.fd);
result = i;
goto done;
} else {
/* client is not on a buffer, need to skip this buffer and
* wait some more */
GST_LOG_OBJECT (sink, "New client on fd %d skipping buffer",
client->fd.fd);
client->bufpos--;
}
}
result = -1;
}
break;
}
case GST_SYNC_METHOD_BURST:
{
/* FIXME for new clients we constantly scan the complete
* buffer queue for sync point whenever a buffer is added. This is
* suboptimal because if we cannot find a sync point the first time,
* the algorithm should behave as GST_SYNC_METHOD_WAIT */
gint i, len;
GST_LOG_OBJECT (sink, "New client on fd %d, bufpos %d, bursting keyframe",
client->fd.fd, client->bufpos);
/* take length of queued buffers */
len = sink->bufqueue->len;
/* assume we don't find a keyframe */
result = -1;
/* then loop over all buffers to find the first keyframe */
for (i = 0; i < len; i++) {
GstBuffer *buf;
buf = g_array_index (sink->bufqueue, GstBuffer *, i);
if (is_sync_frame (sink, buf)) {
/* found a keyframe, return its position */
GST_LOG_OBJECT (sink, "found keyframe at %d", i);
result = i;
goto done;
}
}
GST_LOG_OBJECT (sink, "no keyframe found");
/* throw client to the waiting state */
client->bufpos = -1;
break;
}
default:
/* no syncing, we are happy with whatever the client is going to get */
GST_LOG_OBJECT (sink, "no client syn needed");
result = client->bufpos;
break;
}
return client->bufpos;
done:
return result;
}
/* handle a write on a client,
@ -904,7 +987,7 @@ gst_multifdsink_handle_client_write (GstMultiFdSink * sink,
if (client->new_connection) {
gint position = gst_multifdsink_new_client (sink, client);
if (position > 0) {
if (position >= 0) {
/* we got a valid spot in the queue */
client->new_connection = FALSE;
client->bufpos = position;
@ -1017,14 +1100,15 @@ gst_multifdsink_recover_client (GstMultiFdSink * sink, GstTCPClient * client)
newbufpos = sink->units_soft_max;
break;
case GST_RECOVER_POLICY_RESYNC_KEYFRAME:
/* find keyframe in buffers */
/* find keyframe in buffers, we search backwards to find the
* closest keyframe relative to what this client already received. */
newbufpos = MIN (sink->bufqueue->len - 1, sink->units_soft_max - 1);
while (newbufpos > 0) {
while (newbufpos >= 0) {
GstBuffer *buf;
buf = g_array_index (sink->bufqueue, GstBuffer *, newbufpos);
if (!GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_DELTA_UNIT)) {
if (is_sync_frame (sink, buf)) {
/* found a buffer that is not a delta unit */
break;
}
@ -1129,8 +1213,36 @@ gst_multifdsink_queue_buffer (GstMultiFdSink * sink, GstBuffer * buf)
max_buffer_usage = client->bufpos;
}
}
/* now look for sync points and make sure there is at least one
* sync point in the queue. We only do this if the burst mode
* is enabled. */
if (sink->sync_method == GST_SYNC_METHOD_BURST) {
/* no point in searching beyond the queue length */
gint limit = queuelen;
GstBuffer *buf;
/* no point in searching beyond the soft-max if any. */
if (sink->units_soft_max > 0) {
limit = MIN (limit, sink->units_soft_max);
}
GST_LOG_OBJECT (sink, "extending queue to include sync point, now at %d",
max_buffer_usage);
for (i = 0; i < limit; i++) {
buf = g_array_index (sink->bufqueue, GstBuffer *, i);
if (is_sync_frame (sink, buf)) {
/* found a sync frame, now extend the buffer usage to
* include at least this frame. */
max_buffer_usage = MAX (max_buffer_usage, i);
break;
}
}
GST_LOG_OBJECT (sink, "max buffer usage is now %d", max_buffer_usage);
}
/* nobody is referencing units after max_buffer_usage so we can
* remove them from the queue */
* remove them from the queue. We remove them in reverse order as
* this is the most optimal for GArray. */
for (i = queuelen - 1; i > max_buffer_usage; i--) {
GstBuffer *old;
@ -1142,6 +1254,7 @@ gst_multifdsink_queue_buffer (GstMultiFdSink * sink, GstBuffer * buf)
/* unref tail buffer */
gst_buffer_unref (old);
}
/* save for stats */
sink->buffers_queued = max_buffer_usage;
g_mutex_unlock (sink->clientslock);
@ -1396,7 +1509,14 @@ gst_multifdsink_set_property (GObject * object, guint prop_id,
multifdsink->timeout = g_value_get_uint64 (value);
break;
case ARG_SYNC_CLIENTS:
multifdsink->sync_clients = g_value_get_boolean (value);
if (g_value_get_boolean (value) == TRUE) {
multifdsink->sync_method = GST_SYNC_METHOD_WAIT;
} else {
multifdsink->sync_method = GST_SYNC_METHOD_NONE;
}
break;
case ARG_SYNC_METHOD:
multifdsink->sync_method = g_value_get_enum (value);
break;
default:
@ -1452,7 +1572,11 @@ gst_multifdsink_get_property (GObject * object, guint prop_id, GValue * value,
g_value_set_uint64 (value, multifdsink->timeout);
break;
case ARG_SYNC_CLIENTS:
g_value_set_boolean (value, multifdsink->sync_clients);
g_value_set_boolean (value,
multifdsink->sync_method == GST_SYNC_METHOD_WAIT);
break;
case ARG_SYNC_METHOD:
g_value_set_enum (value, multifdsink->sync_method);
break;
case ARG_BYTES_TO_SERVE:
g_value_set_uint64 (value, multifdsink->bytes_to_serve);

View file

@ -63,6 +63,13 @@ typedef enum
GST_RECOVER_POLICY_RESYNC_KEYFRAME,
} GstRecoverPolicy;
typedef enum
{
GST_SYNC_METHOD_NONE,
GST_SYNC_METHOD_WAIT,
GST_SYNC_METHOD_BURST,
} GstSyncMethod;
typedef enum
{
GST_UNIT_TYPE_BUFFERS,
@ -127,9 +134,6 @@ struct _GstMultiFdSink {
GstFDSetMode mode;
GstFDSet *fdset;
//fd_set readfds; /* all the client file descriptors that we can read from */
//fd_set writefds; /* all the client file descriptors that we can write to */
GstFD control_sock[2];/* sockets for controlling the select call */
GSList *streamheader; /* GSList of GstBuffers to use as streamheader */
@ -146,7 +150,7 @@ struct _GstMultiFdSink {
gint units_soft_max; /* max units a client can lag before recovery starts */
GstRecoverPolicy recover_policy;
GstClockTime timeout; /* max amount of nanoseconds to remain idle */
gboolean sync_clients;/* sync clients to keyframe */
GstSyncMethod sync_method; /* what method to use for connecting clients */
/* stats */
gint buffers_queued; /* number of queued buffers */