gst/tcp/: Small cleanups in fdset.c

Original commit message from CVS:
* gst/tcp/gstfdset.c: (gst_fdset_fd_has_closed),
(gst_fdset_fd_has_error), (gst_fdset_fd_can_read),
(gst_fdset_fd_can_write), (gst_fdset_wait):
* gst/tcp/gstmultifdsink.c: (gst_client_status_get_type),
(gst_multifdsink_init), (gst_multifdsink_add),
(gst_multifdsink_remove), (gst_multifdsink_get_stats),
(gst_multifdsink_remove_client_link),
(gst_multifdsink_client_queue_buffer),
(gst_multifdsink_handle_client_write),
(gst_multifdsink_recover_client), (gst_multifdsink_handle_clients),
(gst_multifdsink_close), (gst_multifdsink_change_state):
* gst/tcp/gstmultifdsink.h:
* gst/tcp/gsttcpserversink.c: (gst_tcpserversink_class_init),
(gst_tcpserversink_removed):
Small cleanups in fdset.c
Use a hastable to map fd to the client structure for faster
lookup in _remove and get_stats.
Added virtual function to close the fds.
Handle clients even when the select/poll call was unblocked because
of a command.
Implement syncing to keyframe in the recovery procedure.
This commit is contained in:
Wim Taymans 2004-09-17 10:06:52 +00:00
parent 02be6646cc
commit de08f07f19
5 changed files with 154 additions and 64 deletions

View file

@ -1,3 +1,27 @@
2004-09-17 Wim Taymans <wim@fluendo.com>
* gst/tcp/gstfdset.c: (gst_fdset_fd_has_closed),
(gst_fdset_fd_has_error), (gst_fdset_fd_can_read),
(gst_fdset_fd_can_write), (gst_fdset_wait):
* gst/tcp/gstmultifdsink.c: (gst_client_status_get_type),
(gst_multifdsink_init), (gst_multifdsink_add),
(gst_multifdsink_remove), (gst_multifdsink_get_stats),
(gst_multifdsink_remove_client_link),
(gst_multifdsink_client_queue_buffer),
(gst_multifdsink_handle_client_write),
(gst_multifdsink_recover_client), (gst_multifdsink_handle_clients),
(gst_multifdsink_close), (gst_multifdsink_change_state):
* gst/tcp/gstmultifdsink.h:
* gst/tcp/gsttcpserversink.c: (gst_tcpserversink_class_init),
(gst_tcpserversink_removed):
Small cleanups in fdset.c
Use a hastable to map fd to the client structure for faster
lookup in _remove and get_stats.
Added virtual function to close the fds.
Handle clients even when the select/poll call was unblocked because
of a command.
Implement syncing to keyframe in the recovery procedure.
2004-09-16 Iain <iaingnome@gmail.com>
* gst/audioconvert/gstaudioconvert.c (_fixate_caps_to_int): Free the

View file

@ -337,8 +337,9 @@ gst_fdset_fd_has_closed (GstFDSet * set, GstFD * fd)
gint idx = fd->idx;
g_mutex_lock (set->poll_lock);
if (idx >= 0 && idx < set->last_testpollfds)
if (idx >= 0 && idx < set->last_testpollfds) {
res = (set->testpollfds[idx].revents & POLLHUP) != 0;
}
g_mutex_unlock (set->poll_lock);
break;
}
@ -365,8 +366,9 @@ gst_fdset_fd_has_error (GstFDSet * set, GstFD * fd)
gint idx = fd->idx;
g_mutex_lock (set->poll_lock);
if (idx >= 0 && idx < set->last_testpollfds)
if (idx >= 0 && idx < set->last_testpollfds) {
res = (set->testpollfds[idx].revents & (POLLERR | POLLNVAL)) != 0;
}
g_mutex_unlock (set->poll_lock);
break;
}
@ -393,8 +395,9 @@ gst_fdset_fd_can_read (GstFDSet * set, GstFD * fd)
gint idx = fd->idx;
g_mutex_lock (set->poll_lock);
if (idx >= 0 && idx < set->last_testpollfds)
if (idx >= 0 && idx < set->last_testpollfds) {
res = (set->testpollfds[idx].revents & (POLLIN | POLLPRI)) != 0;
}
g_mutex_unlock (set->poll_lock);
break;
}
@ -421,8 +424,9 @@ gst_fdset_fd_can_write (GstFDSet * set, GstFD * fd)
gint idx = fd->idx;
g_mutex_lock (set->poll_lock);
if (idx >= 0 && idx < set->last_testpollfds)
if (idx >= 0 && idx < set->last_testpollfds) {
res = (set->testpollfds[idx].revents & POLLOUT) != 0;
}
g_mutex_unlock (set->poll_lock);
break;
}

View file

@ -175,6 +175,7 @@ gst_client_status_get_type (void)
{GST_CLIENT_STATUS_REMOVED, "GST_CLIENT_STATUS_REMOVED", "Removed"},
{GST_CLIENT_STATUS_SLOW, "GST_CLIENT_STATUS_SLOW", "Too slow"},
{GST_CLIENT_STATUS_ERROR, "GST_CLIENT_STATUS_ERROR", "Error"},
{GST_CLIENT_STATUS_DUPLICATE, "GST_CLIENT_STATUS_DUPLICATE", "Duplicate"},
{0, NULL, NULL},
};
@ -376,6 +377,7 @@ gst_multifdsink_init (GstMultiFdSink * this)
this->clientslock = g_mutex_new ();
this->clients = NULL;
this->fd_hash = g_hash_table_new (g_int_hash, g_int_equal);
this->bufqueue = g_array_new (FALSE, TRUE, sizeof (GstBuffer *));
this->unit_type = DEFAULT_UNIT_TYPE;
@ -391,6 +393,7 @@ void
gst_multifdsink_add (GstMultiFdSink * sink, int fd)
{
GstTCPClient *client;
GList *clink;
GTimeVal now;
gint flags, res;
struct stat statbuf;
@ -418,7 +421,21 @@ gst_multifdsink_add (GstMultiFdSink * sink, int fd)
g_mutex_lock (sink->clientslock);
sink->clients = g_list_prepend (sink->clients, client);
/* check the hash to find a duplicate fd */
clink = g_hash_table_lookup (sink->fd_hash, &client->fd.fd);
if (clink != NULL) {
client->status = GST_CLIENT_STATUS_DUPLICATE;
g_mutex_unlock (sink->clientslock);
GST_WARNING_OBJECT (sink, "duplicate client with fd %d found", fd);
g_signal_emit (G_OBJECT (sink),
gst_multifdsink_signals[SIGNAL_CLIENT_REMOVED], 0, fd, client->status);
g_free (client);
return;
}
/* we can add the fd now */
clink = sink->clients = g_list_prepend (sink->clients, client);
g_hash_table_insert (sink->fd_hash, &client->fd.fd, clink);
/* set the socket to non blocking */
res = fcntl (fd, F_SETFL, O_NONBLOCK);
@ -447,24 +464,20 @@ gst_multifdsink_add (GstMultiFdSink * sink, int fd)
void
gst_multifdsink_remove (GstMultiFdSink * sink, int fd)
{
GList *clients, *next;
GList *clink;
GST_DEBUG_OBJECT (sink, "removing client on fd %d", fd);
g_mutex_lock (sink->clientslock);
/* loop over the clients to find the one with the fd */
for (clients = sink->clients; clients; clients = next) {
GstTCPClient *client;
clink = g_hash_table_lookup (sink->fd_hash, &fd);
if (clink != NULL) {
GstTCPClient *client = (GstTCPClient *) clink->data;
client = (GstTCPClient *) clients->data;
next = g_list_next (clients);
if (client->fd.fd == fd) {
client->status = GST_CLIENT_STATUS_REMOVED;
gst_multifdsink_remove_client_link (sink, clients);
SEND_COMMAND (sink, CONTROL_RESTART);
break;
}
client->status = GST_CLIENT_STATUS_REMOVED;
gst_multifdsink_remove_client_link (sink, clink);
SEND_COMMAND (sink, CONTROL_RESTART);
} else {
GST_WARNING_OBJECT (sink, "no client with fd %d found!", fd);
}
g_mutex_unlock (sink->clientslock);
}
@ -493,57 +506,53 @@ gst_multifdsink_clear (GstMultiFdSink * sink)
GValueArray *
gst_multifdsink_get_stats (GstMultiFdSink * sink, int fd)
{
GList *clients;
GstTCPClient *client;
GValueArray *result = NULL;
GList *clink;
g_mutex_lock (sink->clientslock);
/* loop over the clients to find the one with the fd */
for (clients = sink->clients; clients; clients = g_list_next (clients)) {
GstTCPClient *client;
clink = g_hash_table_lookup (sink->fd_hash, &fd);
client = (GstTCPClient *) clink->data;
if (client != NULL) {
GValue value = { 0 };
guint64 interval;
client = (GstTCPClient *) clients->data;
result = g_value_array_new (4);
if (client->fd.fd == fd) {
GValue value = { 0 };
guint64 interval;
g_value_init (&value, G_TYPE_UINT64);
g_value_set_uint64 (&value, client->bytes_sent);
result = g_value_array_append (result, &value);
g_value_unset (&value);
g_value_init (&value, G_TYPE_UINT64);
g_value_set_uint64 (&value, client->connect_time);
result = g_value_array_append (result, &value);
g_value_unset (&value);
if (client->disconnect_time == 0) {
GTimeVal nowtv;
result = g_value_array_new (4);
g_get_current_time (&nowtv);
g_value_init (&value, G_TYPE_UINT64);
g_value_set_uint64 (&value, client->bytes_sent);
result = g_value_array_append (result, &value);
g_value_unset (&value);
g_value_init (&value, G_TYPE_UINT64);
g_value_set_uint64 (&value, client->connect_time);
result = g_value_array_append (result, &value);
g_value_unset (&value);
if (client->disconnect_time == 0) {
GTimeVal nowtv;
g_get_current_time (&nowtv);
interval = GST_TIMEVAL_TO_TIME (nowtv) - client->connect_time;
} else {
interval = client->disconnect_time - client->connect_time;
}
g_value_init (&value, G_TYPE_UINT64);
g_value_set_uint64 (&value, client->disconnect_time);
result = g_value_array_append (result, &value);
g_value_unset (&value);
g_value_init (&value, G_TYPE_UINT64);
g_value_set_uint64 (&value, interval);
result = g_value_array_append (result, &value);
g_value_unset (&value);
g_value_init (&value, G_TYPE_UINT64);
g_value_set_uint64 (&value, client->last_activity_time);
result = g_value_array_append (result, &value);
break;
interval = GST_TIMEVAL_TO_TIME (nowtv) - client->connect_time;
} else {
interval = client->disconnect_time - client->connect_time;
}
g_value_init (&value, G_TYPE_UINT64);
g_value_set_uint64 (&value, client->disconnect_time);
result = g_value_array_append (result, &value);
g_value_unset (&value);
g_value_init (&value, G_TYPE_UINT64);
g_value_set_uint64 (&value, interval);
result = g_value_array_append (result, &value);
g_value_unset (&value);
g_value_init (&value, G_TYPE_UINT64);
g_value_set_uint64 (&value, client->last_activity_time);
result = g_value_array_append (result, &value);
}
g_mutex_unlock (sink->clientslock);
/* python doesn't like a NULL pointer yet */
if (result == NULL) {
GST_WARNING_OBJECT (sink, "no client with fd %d found!", fd);
result = g_value_array_new (0);
}
@ -561,6 +570,9 @@ gst_multifdsink_remove_client_link (GstMultiFdSink * sink, GList * link)
int fd;
GTimeVal now;
GstTCPClient *client = (GstTCPClient *) link->data;
GstMultiFdSinkClass *fclass;
fclass = GST_MULTIFDSINK_GET_CLASS (sink);
fd = client->fd.fd;
@ -613,8 +625,15 @@ gst_multifdsink_remove_client_link (GstMultiFdSink * sink, GList * link)
/* lock again before we remove the client completely */
g_mutex_lock (sink->clientslock);
if (!g_hash_table_remove (sink->fd_hash, &client->fd.fd)) {
GST_WARNING_OBJECT (sink,
"error removing client %p with fd %d from hash", client, fd);
}
sink->clients = g_list_delete_link (sink->clients, link);
if (fclass->removed)
fclass->removed (sink, client->fd.fd);
g_free (client);
}
@ -735,9 +754,12 @@ gst_multifdsink_client_queue_buffer (GstMultiFdSink * sink,
GstTCPClient * client, GstBuffer * buffer)
{
if (client->need_keyunit) {
GST_LOG_OBJECT (sink, "client with fd %d needs keyunit", client->fd.fd);
if (GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_DELTA_UNIT)) {
GST_LOG_OBJECT (sink, "skipping delta unit for fd %d", client->fd.fd);
return TRUE;
} else if (!GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_IN_CAPS)) {
GST_LOG_OBJECT (sink, "found key unit for fd %d", client->fd.fd);
client->need_keyunit = FALSE;
}
}
@ -754,6 +776,9 @@ gst_multifdsink_client_queue_buffer (GstMultiFdSink * sink,
gst_multifdsink_client_queue_data (sink, client, header, len);
}
GST_LOG_OBJECT (sink, "Queueing buffer of length %d for fd %d",
GST_BUFFER_SIZE (buffer), client->fd.fd);
gst_buffer_ref (buffer);
client->sending = g_slist_append (client->sending, buffer);
@ -852,6 +877,8 @@ gst_multifdsink_handle_client_write (GstMultiFdSink * sink,
/* grab buffer */
buf = g_array_index (sink->bufqueue, GstBuffer *, client->bufpos);
client->bufpos--;
GST_LOG_OBJECT (sink, "client %p with fd %d at position %d",
client, fd, client->bufpos);
/* queueing a buffer will ref it */
gst_multifdsink_client_queue_buffer (sink, client, buf);
@ -949,14 +976,26 @@ gst_multifdsink_recover_client (GstMultiFdSink * sink, GstTCPClient * client)
newbufpos = sink->units_soft_max;
break;
case GST_RECOVER_POLICY_RESYNC_KEYFRAME:
/* FIXME, find keyframe in buffers */
newbufpos = sink->units_soft_max;
/* find keyframe in buffers */
newbufpos = MIN (sink->bufqueue->len - 1, sink->units_soft_max);
while (newbufpos > 0) {
GstBuffer *buf;
buf = g_array_index (sink->bufqueue, GstBuffer *, newbufpos);
if (!GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_DELTA_UNIT)) {
/* found a buffer that is not a delta unit */
break;
}
}
break;
default:
/* unknown recovery procedure */
newbufpos = sink->units_soft_max;
break;
}
/* sync to keyframe if needed */
client->need_keyunit = sink->sync_clients;
return newbufpos;
}
@ -1101,7 +1140,7 @@ gst_multifdsink_handle_clients (GstMultiFdSink * sink)
* - client socket output (ie, client reads) */
result = gst_fdset_wait (sink->fdset, -1);
/* < 0 is an error, 0 just means a timeout happened */
/* < 0 is an error, 0 just means a timeout happened, which is impossible */
if (result < 0) {
GST_WARNING_OBJECT (sink, "wait failed: %s", g_strerror (errno));
if (errno == EBADF) {
@ -1142,7 +1181,7 @@ gst_multifdsink_handle_clients (GstMultiFdSink * sink)
return;
}
} else {
GST_LOG_OBJECT (sink, "wait done: %d", result);
GST_LOG_OBJECT (sink, "wait done: %d sockets with events", result);
/* read all commands */
if (gst_fdset_fd_can_read (sink->fdset, &READ_SOCKET (sink))) {
GST_LOG_OBJECT (sink, "have a command");
@ -1161,10 +1200,13 @@ gst_multifdsink_handle_clients (GstMultiFdSink * sink)
case CONTROL_RESTART:
GST_LOG_OBJECT (sink, "restart");
/* need to restart the select call as the fd_set changed */
try_again = TRUE;
/* if other file descriptors than the READ_SOCKET had activity,
* we don't restart just yet, but handle the other clients first */
if (result == 1)
try_again = TRUE;
break;
/* need to restart the select call as the fd_set changed */
case CONTROL_STOP:
/* break out of the select loop */
GST_LOG_OBJECT (sink, "stop");
/* stop this function */
stop = TRUE;
@ -1182,10 +1224,11 @@ gst_multifdsink_handle_clients (GstMultiFdSink * sink)
}
} while (try_again);
/* subclasses can check fdset with this virtual function */
if (fclass->wait)
fclass->wait (sink, sink->fdset);
/* Check the reads */
/* Check the clients */
g_mutex_lock (sink->clientslock);
for (clients = sink->clients; clients; clients = next) {
GstTCPClient *client;
@ -1440,6 +1483,9 @@ gst_multifdsink_close (GstMultiFdSink * this)
this->thread = NULL;
}
/* free the clients */
gst_multifdsink_clear (this);
close (READ_SOCKET (this).fd);
close (WRITE_SOCKET (this).fd);
@ -1489,7 +1535,6 @@ gst_multifdsink_change_state (GstElement * element)
GST_FLAG_UNSET (sink, GST_MULTIFDSINK_OPEN);
}
break;
}
if (GST_ELEMENT_CLASS (parent_class)->change_state)

View file

@ -77,6 +77,7 @@ typedef enum
GST_CLIENT_STATUS_REMOVED = 2,
GST_CLIENT_STATUS_SLOW = 3,
GST_CLIENT_STATUS_ERROR = 4,
GST_CLIENT_STATUS_DUPLICATE = 5,
} GstClientStatus;
/* structure for a client
@ -121,6 +122,7 @@ struct _GstMultiFdSink {
GMutex *clientslock; /* lock to protect the clients list */
GList *clients; /* list of clients we are serving */
GHashTable *fd_hash; /* index on fd to client */
GstFDSetMode mode;
GstFDSet *fdset;
@ -165,6 +167,7 @@ struct _GstMultiFdSinkClass {
gboolean (*init) (GstMultiFdSink *sink);
gboolean (*wait) (GstMultiFdSink *sink, GstFDSet *set);
gboolean (*close) (GstMultiFdSink *sink);
void (*removed) (GstMultiFdSink *sink, int fd);
/* signals */
void (*client_added) (GstElement *element, gchar *host, gint fd);

View file

@ -61,6 +61,7 @@ static gboolean gst_tcpserversink_handle_wait (GstMultiFdSink * sink,
GstFDSet * set);
static gboolean gst_tcpserversink_init_send (GstMultiFdSink * this);
static gboolean gst_tcpserversink_close (GstMultiFdSink * this);
static void gst_tcpserversink_removed (GstMultiFdSink * sink, int fd);
static void gst_tcpserversink_set_property (GObject * object, guint prop_id,
const GValue * value, GParamSpec * pspec);
@ -131,6 +132,7 @@ gst_tcpserversink_class_init (GstTCPServerSink * klass)
gstmultifdsink_class->init = gst_tcpserversink_init_send;
gstmultifdsink_class->wait = gst_tcpserversink_handle_wait;
gstmultifdsink_class->close = gst_tcpserversink_close;
gstmultifdsink_class->removed = gst_tcpserversink_removed;
GST_DEBUG_CATEGORY_INIT (tcpserversink_debug, "tcpserversink", 0, "TCP sink");
}
@ -172,6 +174,18 @@ gst_tcpserversink_handle_server_read (GstTCPServerSink * sink)
return TRUE;
}
static void
gst_tcpserversink_removed (GstMultiFdSink * sink, int fd)
{
GstTCPServerSink *this = GST_TCPSERVERSINK (sink);
GST_LOG_OBJECT (this, "closing fd %d", fd);
if (close (fd) < 0) {
GST_WARNING_OBJECT (this, "error closing fd %d: %s", fd,
g_strerror (errno));
}
}
static gboolean
gst_tcpserversink_handle_wait (GstMultiFdSink * sink, GstFDSet * set)
{