gst/tcp/gstmultifdsink.*: Make syncing to keyframes actually work for new clients and lagging clients.

Original commit message from CVS:
* gst/tcp/gstmultifdsink.c: (gst_multifdsink_add),
(gst_multifdsink_remove), (gst_multifdsink_remove_client_link),
(is_sync_frame), (gst_multifdsink_client_queue_buffer),
(gst_multifdsink_new_client),
(gst_multifdsink_handle_client_write),
(gst_multifdsink_recover_client), (gst_multifdsink_queue_buffer),
(gst_multifdsink_handle_clients):
* gst/tcp/gstmultifdsink.h:
Make syncing to keyframes actually work for new clients and lagging
clients.
This commit is contained in:
Wim Taymans 2004-09-27 15:09:35 +00:00
parent 6934688b89
commit e3e3775c80
3 changed files with 70 additions and 17 deletions

View file

@ -1,3 +1,16 @@
2004-09-27 Wim Taymans <wim@fluendo.com>
* gst/tcp/gstmultifdsink.c: (gst_multifdsink_add),
(gst_multifdsink_remove), (gst_multifdsink_remove_client_link),
(is_sync_frame), (gst_multifdsink_client_queue_buffer),
(gst_multifdsink_new_client),
(gst_multifdsink_handle_client_write),
(gst_multifdsink_recover_client), (gst_multifdsink_queue_buffer),
(gst_multifdsink_handle_clients):
* gst/tcp/gstmultifdsink.h:
Make syncing to keyframes actually work for new clients and lagging
clients.
2004-09-26 Benjamin Otte <in7y118@public.uni-hamburg.de> 2004-09-26 Benjamin Otte <in7y118@public.uni-hamburg.de>
* gst/debug/gstnavigationtest.c: (gst_navigationtest_class_init), * gst/debug/gstnavigationtest.c: (gst_navigationtest_class_init),

View file

@ -410,7 +410,7 @@ gst_multifdsink_add (GstMultiFdSink * sink, int fd)
client->bytes_sent = 0; client->bytes_sent = 0;
client->dropped_buffers = 0; client->dropped_buffers = 0;
client->avg_queue_size = 0; client->avg_queue_size = 0;
client->need_keyunit = sink->sync_clients; client->new_connection = TRUE;
/* update start time */ /* update start time */
g_get_current_time (&now); g_get_current_time (&now);
@ -749,21 +749,21 @@ gst_multifdsink_client_queue_caps (GstMultiFdSink * sink, GstTCPClient * client,
return TRUE; return TRUE;
} }
static gboolean
is_sync_frame (GstMultiFdSink * sink, GstBuffer * buffer)
{
if (GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_DELTA_UNIT)) {
return FALSE;
} else if (!GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_IN_CAPS)) {
return TRUE;
}
return FALSE;
}
static gboolean static gboolean
gst_multifdsink_client_queue_buffer (GstMultiFdSink * sink, gst_multifdsink_client_queue_buffer (GstMultiFdSink * sink,
GstTCPClient * client, GstBuffer * buffer) GstTCPClient * client, GstBuffer * buffer)
{ {
if (client->need_keyunit) {
GST_LOG_OBJECT (sink, "client with fd %d needs keyunit", client->fd.fd);
if (GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_DELTA_UNIT)) {
GST_LOG_OBJECT (sink, "skipping delta unit for fd %d", client->fd.fd);
return TRUE;
} else if (!GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_IN_CAPS)) {
GST_LOG_OBJECT (sink, "found key unit for fd %d", client->fd.fd);
client->need_keyunit = FALSE;
}
}
if (sink->protocol == GST_TCP_PROTOCOL_TYPE_GDP) { if (sink->protocol == GST_TCP_PROTOCOL_TYPE_GDP) {
guint8 *header; guint8 *header;
guint len; guint len;
@ -785,6 +785,31 @@ gst_multifdsink_client_queue_buffer (GstMultiFdSink * sink,
return TRUE; return TRUE;
} }
static gint
gst_multifdsink_new_client (GstMultiFdSink * sink, GstTCPClient * client)
{
if (sink->sync_clients) {
GstBuffer *buf;
GST_LOG_OBJECT (sink, "New client on fd %d, bufpos %d",
client->fd.fd, client->bufpos);
if (client->bufpos < 0)
return -1;
buf = g_array_index (sink->bufqueue, GstBuffer *, client->bufpos);
if (is_sync_frame (sink, buf)) {
GST_LOG_OBJECT (sink, "New client on fd %d found sync", client->fd.fd);
return client->bufpos;
} else {
GST_LOG_OBJECT (sink, "New client on fd %d skipping buffer",
client->fd.fd);
client->bufpos--;
return -1;
}
}
return client->bufpos;
}
/* handle a write on a client, /* handle a write on a client,
* which indicates a read request from a client. * which indicates a read request from a client.
@ -874,6 +899,22 @@ gst_multifdsink_handle_client_write (GstMultiFdSink * sink,
/* client can pick a buffer from the global queue */ /* client can pick a buffer from the global queue */
GstBuffer *buf; GstBuffer *buf;
/* for new connections, we need to find a good spot in the
* bufqueue to start streaming from */
if (client->new_connection) {
gint position = gst_multifdsink_new_client (sink, client);
if (position > 0) {
/* we got a valid spot in the queue */
client->new_connection = FALSE;
client->bufpos = position;
} else {
/* cannot send data to this client yet */
gst_fdset_fd_ctl_write (sink->fdset, &client->fd, FALSE);
return TRUE;
}
}
/* grab buffer */ /* grab buffer */
buf = g_array_index (sink->bufqueue, GstBuffer *, client->bufpos); buf = g_array_index (sink->bufqueue, GstBuffer *, client->bufpos);
client->bufpos--; client->bufpos--;
@ -977,7 +1018,7 @@ gst_multifdsink_recover_client (GstMultiFdSink * sink, GstTCPClient * client)
break; break;
case GST_RECOVER_POLICY_RESYNC_KEYFRAME: case GST_RECOVER_POLICY_RESYNC_KEYFRAME:
/* find keyframe in buffers */ /* find keyframe in buffers */
newbufpos = MIN (sink->bufqueue->len - 1, sink->units_soft_max); newbufpos = MIN (sink->bufqueue->len - 1, sink->units_soft_max - 1);
while (newbufpos > 0) { while (newbufpos > 0) {
GstBuffer *buf; GstBuffer *buf;
@ -987,6 +1028,7 @@ gst_multifdsink_recover_client (GstMultiFdSink * sink, GstTCPClient * client)
/* found a buffer that is not a delta unit */ /* found a buffer that is not a delta unit */
break; break;
} }
newbufpos--;
} }
break; break;
default: default:
@ -994,8 +1036,6 @@ gst_multifdsink_recover_client (GstMultiFdSink * sink, GstTCPClient * client)
newbufpos = sink->units_soft_max; newbufpos = sink->units_soft_max;
break; break;
} }
/* sync to keyframe if needed */
client->need_keyunit = sink->sync_clients;
return newbufpos; return newbufpos;
} }
@ -1078,7 +1118,7 @@ gst_multifdsink_queue_buffer (GstMultiFdSink * sink, GstBuffer * buf)
/* set client to invalid position while being removed */ /* set client to invalid position while being removed */
client->bufpos = -1; client->bufpos = -1;
need_signal = TRUE; need_signal = TRUE;
} else if (client->bufpos == 0) { } else if (client->bufpos == 0 || client->new_connection) {
/* can send data to this client now. need to signal the select thread that /* can send data to this client now. need to signal the select thread that
* the fd_set changed */ * the fd_set changed */
gst_fdset_fd_ctl_write (sink->fdset, &client->fd, TRUE); gst_fdset_fd_ctl_write (sink->fdset, &client->fd, TRUE);

View file

@ -99,7 +99,7 @@ typedef struct {
gboolean caps_sent; gboolean caps_sent;
gboolean streamheader_sent; gboolean streamheader_sent;
gboolean need_keyunit; gboolean new_connection;
/* stats */ /* stats */
guint64 bytes_sent; guint64 bytes_sent;