multihandlesink: rework to use Handle

This commit is contained in:
Thomas Vander Stichele 2012-01-28 11:02:21 +01:00
parent d4429ecef9
commit fa14beb88b
5 changed files with 163 additions and 93 deletions

View file

@ -110,6 +110,8 @@
#include <unistd.h>
#endif
// FIXME: remove
#include <string.h>
#include <fcntl.h>
#include <sys/types.h>
#include <sys/socket.h>
@ -195,6 +197,8 @@ static void gst_multi_fd_sink_queue_buffer (GstMultiHandleSink * mhsink,
static gboolean gst_multi_fd_sink_client_queue_buffer (GstMultiHandleSink *
mhsink, GstMultiHandleClient * mhclient, GstBuffer * buffer);
static int gst_multi_fd_sink_client_get_fd (GstMultiHandleClient * client);
static void
gst_multi_fd_sink_handle_debug (GstMultiSinkHandle handle, gchar debug[30]);
static void gst_multi_fd_sink_remove_client_link (GstMultiHandleSink * sink,
GList * link);
@ -413,6 +417,8 @@ gst_multi_fd_sink_class_init (GstMultiFdSinkClass * klass)
GST_DEBUG_FUNCPTR (gst_multi_fd_sink_client_queue_buffer);
gstmultihandlesink_class->client_get_fd =
GST_DEBUG_FUNCPTR (gst_multi_fd_sink_client_get_fd);
gstmultihandlesink_class->handle_debug =
GST_DEBUG_FUNCPTR (gst_multi_fd_sink_handle_debug);
gstmultihandlesink_class->remove_client_link =
GST_DEBUG_FUNCPTR (gst_multi_fd_sink_remove_client_link);
@ -432,7 +438,7 @@ gst_multi_fd_sink_init (GstMultiFdSink * this)
{
this->mode = DEFAULT_MODE;
this->fd_hash = g_hash_table_new (g_int_hash, g_int_equal);
this->handle_hash = g_hash_table_new (g_int_hash, g_int_equal);
this->handle_read = DEFAULT_HANDLE_READ;
}
@ -442,7 +448,7 @@ gst_multi_fd_sink_finalize (GObject * object)
{
GstMultiFdSink *this = GST_MULTI_FD_SINK (object);
g_hash_table_destroy (this->fd_hash);
g_hash_table_destroy (this->handle_hash);
G_OBJECT_CLASS (parent_class)->finalize (object);
}
@ -452,7 +458,13 @@ gst_multi_fd_sink_client_get_fd (GstMultiHandleClient * client)
{
GstTCPClient *tclient = (GstTCPClient *) client;
return tclient->fd.fd;
return tclient->gfd.fd;
}
static void
gst_multi_fd_sink_handle_debug (GstMultiSinkHandle handle, gchar debug[30])
{
g_snprintf (debug, 30, "[fd %5d]", handle.fd);
}
/* "add-full" signal implementation */
@ -469,10 +481,14 @@ gst_multi_fd_sink_add_full (GstMultiFdSink * sink, GstMultiSinkHandle handle,
GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
// FIXME: convert to a function so we can vfunc this
int fd = handle.fd;
gchar debug[30];
GstMultiHandleSinkClass *mhsinkclass =
GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
mhsinkclass->handle_debug (handle, debug);
GST_DEBUG_OBJECT (sink, "%s adding client, sync_method %d, "
"min_format %d, min_value %" G_GUINT64_FORMAT
", max_format %d, max_value %" G_GUINT64_FORMAT, mhclient->debug,
", max_format %d, max_value %" G_GUINT64_FORMAT, debug,
sync_method, min_format, min_value, max_format, max_value);
/* do limits check if we can */
@ -485,9 +501,11 @@ gst_multi_fd_sink_add_full (GstMultiFdSink * sink, GstMultiSinkHandle handle,
client = g_new0 (GstTCPClient, 1);
mhclient = (GstMultiHandleClient *) client;
gst_multi_handle_sink_client_init (mhclient, sync_method);
g_snprintf (mhclient->debug, 30, "[fd %5d]", fd);
strncpy (mhclient->debug, debug, 30);
client->fd.fd = fd;
gst_poll_fd_init (&client->gfd);
client->gfd.fd = fd;
mhclient->handle.fd = fd;
mhclient->burst_min_format = min_format;
mhclient->burst_min_value = min_value;
mhclient->burst_max_format = max_format;
@ -496,13 +514,13 @@ gst_multi_fd_sink_add_full (GstMultiFdSink * sink, GstMultiSinkHandle handle,
CLIENTS_LOCK (sink);
/* check the hash to find a duplicate fd */
clink = g_hash_table_lookup (sink->fd_hash, &client->fd.fd);
clink = g_hash_table_lookup (sink->handle_hash, &client->gfd.fd);
if (clink != NULL)
goto duplicate;
/* we can add the fd now */
clink = mhsink->clients = g_list_prepend (mhsink->clients, client);
g_hash_table_insert (sink->fd_hash, &client->fd.fd, clink);
g_hash_table_insert (sink->handle_hash, &client->gfd.fd, clink);
mhsink->clients_cookie++;
/* set the socket to non blocking */
@ -512,13 +530,13 @@ gst_multi_fd_sink_add_full (GstMultiFdSink * sink, GstMultiSinkHandle handle,
}
/* we always read from a client */
gst_poll_add_fd (sink->fdset, &client->fd);
gst_poll_add_fd (sink->fdset, &client->gfd);
/* we don't try to read from write only fds */
if (sink->handle_read) {
flags = fcntl (fd, F_GETFL, 0);
if ((flags & O_ACCMODE) != O_WRONLY) {
gst_poll_fd_ctl_read (sink->fdset, &client->fd, TRUE);
gst_poll_fd_ctl_read (sink->fdset, &client->gfd, TRUE);
}
}
/* figure out the mode, can't use send() for non sockets */
@ -578,12 +596,14 @@ gst_multi_fd_sink_remove (GstMultiFdSink * sink, GstMultiSinkHandle handle)
GstMultiHandleSinkClass *mhsinkclass =
GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
// FIXME: convert to a function so we can vfunc this
gchar debug[30];
int fd = handle.fd;
GST_DEBUG_OBJECT (sink, "%s removing client", fd);
mhsinkclass->handle_debug (handle, debug);
GST_DEBUG_OBJECT (sink, "%s removing client", debug);
CLIENTS_LOCK (sink);
clink = g_hash_table_lookup (sink->fd_hash, &fd);
clink = g_hash_table_lookup (sink->handle_hash, &fd);
if (clink != NULL) {
GstTCPClient *client = (GstTCPClient *) clink->data;
GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
@ -591,7 +611,7 @@ gst_multi_fd_sink_remove (GstMultiFdSink * sink, GstMultiSinkHandle handle)
if (mhclient->status != GST_CLIENT_STATUS_OK) {
GST_INFO_OBJECT (sink,
"%s Client already disconnecting with status %d",
fd, mhclient->status);
debug, mhclient->status);
goto done;
}
@ -600,7 +620,7 @@ gst_multi_fd_sink_remove (GstMultiFdSink * sink, GstMultiSinkHandle handle)
// FIXME: specific poll
gst_poll_restart (sink->fdset);
} else {
GST_WARNING_OBJECT (sink, "%s no client with this fd found!", fd);
GST_WARNING_OBJECT (sink, "%s no client with this fd found!", debug);
}
done:
@ -614,12 +634,18 @@ gst_multi_fd_sink_remove_flush (GstMultiFdSink * sink,
{
GList *clink;
// FIXME: convert to a function so we can vfunc this
gchar debug[30];
int fd = handle.fd;
GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
GstMultiHandleSinkClass *mhsinkclass =
GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
GST_DEBUG_OBJECT (sink, "%s flushing client", fd);
mhsinkclass->handle_debug (handle, debug);
GST_DEBUG_OBJECT (sink, "%s flushing client", debug);
CLIENTS_LOCK (sink);
clink = g_hash_table_lookup (sink->fd_hash, &fd);
clink = g_hash_table_lookup (sink->handle_hash, &fd);
if (clink != NULL) {
GstTCPClient *client = (GstTCPClient *) clink->data;
GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
@ -627,7 +653,7 @@ gst_multi_fd_sink_remove_flush (GstMultiFdSink * sink,
if (mhclient->status != GST_CLIENT_STATUS_OK) {
GST_INFO_OBJECT (sink,
"%s Client already disconnecting with status %d",
fd, mhclient->status);
debug, mhclient->status);
goto done;
}
@ -639,7 +665,7 @@ gst_multi_fd_sink_remove_flush (GstMultiFdSink * sink,
* it might have some buffers to flush in the ->sending queue. */
mhclient->status = GST_CLIENT_STATUS_FLUSHING;
} else {
GST_WARNING_OBJECT (sink, "%s no client with this fd found!", fd);
GST_WARNING_OBJECT (sink, "%s no client with this fd found!", debug);
}
done:
CLIENTS_UNLOCK (sink);
@ -673,10 +699,16 @@ gst_multi_fd_sink_get_stats (GstMultiFdSink * sink, GstMultiSinkHandle handle)
GValueArray *result = NULL;
GList *clink;
// FIXME: convert to a function so we can vfunc this
gchar debug[30];
int fd = handle.fd;
GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
GstMultiHandleSinkClass *mhsinkclass =
GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
mhsinkclass->handle_debug (handle, debug);
CLIENTS_LOCK (sink);
clink = g_hash_table_lookup (sink->fd_hash, &fd);
clink = g_hash_table_lookup (sink->handle_hash, &fd);
if (clink == NULL)
goto noclient;
@ -735,7 +767,7 @@ noclient:
/* python doesn't like a NULL pointer yet */
if (result == NULL) {
GST_WARNING_OBJECT (sink, "%s no client with this found!", fd);
GST_WARNING_OBJECT (sink, "%s no client with this found!", debug);
result = g_value_array_new (0);
}
@ -755,7 +787,7 @@ gst_multi_fd_sink_remove_client_link (GstMultiHandleSink * sink, GList * link)
GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
GstMultiFdSink *mfsink = GST_MULTI_FD_SINK (sink);
GstMultiFdSinkClass *fclass = GST_MULTI_FD_SINK_GET_CLASS (sink);
int fd = client->fd.fd;
int fd = client->gfd.fd;
if (mhclient->currently_removing) {
GST_WARNING_OBJECT (sink, "%s client is already being removed",
@ -797,7 +829,7 @@ gst_multi_fd_sink_remove_client_link (GstMultiHandleSink * sink, GList * link)
break;
}
gst_poll_remove_fd (mfsink->fdset, &client->fd);
gst_poll_remove_fd (mfsink->fdset, &client->gfd);
g_get_current_time (&now);
mhclient->disconnect_time = GST_TIMEVAL_TO_TIME (now);
@ -824,9 +856,9 @@ gst_multi_fd_sink_remove_client_link (GstMultiHandleSink * sink, GList * link)
/* fd cannot be reused in the above signal callback so we can safely
* remove it from the hashtable here */
if (!g_hash_table_remove (mfsink->fd_hash, &client->fd.fd)) {
if (!g_hash_table_remove (mfsink->handle_hash, &client->gfd.fd)) {
GST_WARNING_OBJECT (sink,
"%s error removing client %p from hash", client->fd.fd, client);
"%s error removing client %p from hash", mhclient->debug, client);
}
/* after releasing the lock above, the link could be invalid, more
* precisely, the next and prev pointers could point to invalid list
@ -837,7 +869,7 @@ gst_multi_fd_sink_remove_client_link (GstMultiHandleSink * sink, GList * link)
sink->clients_cookie++;
if (fclass->removed)
fclass->removed (mfsink, (GstMultiSinkHandle) client->fd.fd);
fclass->removed (mfsink, (GstMultiSinkHandle) client->gfd.fd);
g_free (client);
CLIENTS_UNLOCK (sink);
@ -860,23 +892,24 @@ gst_multi_fd_sink_handle_client_read (GstMultiFdSink * sink,
gboolean ret;
GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
fd = client->fd.fd;
fd = client->gfd.fd;
if (ioctl (fd, FIONREAD, &avail) < 0)
goto ioctl_failed;
GST_DEBUG_OBJECT (sink, "%s select reports client read of %d bytes",
fd, avail);
mhclient->debug, avail);
ret = TRUE;
if (avail == 0) {
/* client sent close, so remove it */
GST_DEBUG_OBJECT (sink, "%s client asked for close, removing", fd);
GST_DEBUG_OBJECT (sink, "%s client asked for close, removing",
mhclient->debug);
mhclient->status = GST_CLIENT_STATUS_CLOSED;
ret = FALSE;
} else if (avail < 0) {
GST_WARNING_OBJECT (sink, "%s avail < 0, removing", fd);
GST_WARNING_OBJECT (sink, "%s avail < 0, removing", mhclient->debug);
mhclient->status = GST_CLIENT_STATUS_ERROR;
ret = FALSE;
} else {
@ -891,17 +924,18 @@ gst_multi_fd_sink_handle_client_read (GstMultiFdSink * sink,
gint to_read = MIN (avail, 512);
GST_DEBUG_OBJECT (sink, "%s client wants us to read %d bytes",
fd, to_read);
mhclient->debug, to_read);
nread = read (fd, dummy, to_read);
if (nread < -1) {
GST_WARNING_OBJECT (sink, "%s could not read %d bytes: %s (%d)",
fd, to_read, g_strerror (errno), errno);
mhclient->debug, to_read, g_strerror (errno), errno);
mhclient->status = GST_CLIENT_STATUS_ERROR;
ret = FALSE;
break;
} else if (nread == 0) {
GST_WARNING_OBJECT (sink, "%s 0 bytes in read, removing", fd);
GST_WARNING_OBJECT (sink, "%s 0 bytes in read, removing",
mhclient->debug);
mhclient->status = GST_CLIENT_STATUS_ERROR;
ret = FALSE;
break;
@ -916,7 +950,7 @@ gst_multi_fd_sink_handle_client_read (GstMultiFdSink * sink,
ioctl_failed:
{
GST_WARNING_OBJECT (sink, "%s ioctl failed: %s (%d)",
fd, g_strerror (errno), errno);
mhclient->debug, g_strerror (errno), errno);
mhclient->status = GST_CLIENT_STATUS_ERROR;
return FALSE;
}
@ -1066,7 +1100,7 @@ static gboolean
gst_multi_fd_sink_handle_client_write (GstMultiFdSink * sink,
GstTCPClient * client)
{
GstMultiSinkHandle handle = (GstMultiSinkHandle) client->fd.fd;
GstMultiSinkHandle handle = (GstMultiSinkHandle) client->gfd.fd;
gboolean more;
gboolean flushing;
GstClockTime now;
@ -1092,7 +1126,7 @@ gst_multi_fd_sink_handle_client_write (GstMultiFdSink * sink,
/* client is too fast, remove from write queue until new buffer is
* available */
// FIXME: specific
gst_poll_fd_ctl_write (sink->fdset, &client->fd, FALSE);
gst_poll_fd_ctl_write (sink->fdset, &client->gfd, FALSE);
//
/* if we flushed out all of the client buffers, we can stop */
if (mhclient->flushcount == 0)
@ -1116,7 +1150,7 @@ gst_multi_fd_sink_handle_client_write (GstMultiFdSink * sink,
} else {
/* cannot send data to this client yet */
// FIXME: specific
gst_poll_fd_ctl_write (sink->fdset, &client->fd, FALSE);
gst_poll_fd_ctl_write (sink->fdset, &client->gfd, FALSE);
return TRUE;
}
}
@ -1218,13 +1252,14 @@ gst_multi_fd_sink_handle_client_write (GstMultiFdSink * sink,
/* ERRORS */
flushed:
{
GST_DEBUG_OBJECT (sink, "%s flushed, removing", fd);
GST_DEBUG_OBJECT (sink, "%s flushed, removing", mhclient->debug);
mhclient->status = GST_CLIENT_STATUS_REMOVED;
return FALSE;
}
connection_reset:
{
GST_DEBUG_OBJECT (sink, "%s connection reset by peer, removing", fd);
GST_DEBUG_OBJECT (sink, "%s connection reset by peer, removing",
mhclient->debug);
mhclient->status = GST_CLIENT_STATUS_CLOSED;
return FALSE;
}
@ -1347,7 +1382,7 @@ restart:
} else if (mhclient->bufpos == 0 || mhclient->new_connection) {
/* can send data to this client now. need to signal the select thread that
* the fd_set changed */
gst_poll_fd_ctl_write (sink->fdset, &client->fd, TRUE);
gst_poll_fd_ctl_write (sink->fdset, &client->gfd, TRUE);
need_signal = TRUE;
}
/* keep track of maximum buffer usage */
@ -1516,7 +1551,7 @@ gst_multi_fd_sink_handle_clients (GstMultiFdSink * sink)
mhclient = (GstMultiHandleClient *) client;
next = g_list_next (clients);
fd = client->fd.fd;
fd = client->gfd.fd;
res = fcntl (fd, F_GETFL, &flags);
if (res == -1) {
@ -1578,25 +1613,25 @@ restart2:
continue;
}
if (gst_poll_fd_has_closed (sink->fdset, &client->fd)) {
if (gst_poll_fd_has_closed (sink->fdset, &client->gfd)) {
mhclient->status = GST_CLIENT_STATUS_CLOSED;
mhsinkclass->remove_client_link (mhsink, clients);
continue;
}
if (gst_poll_fd_has_error (sink->fdset, &client->fd)) {
GST_WARNING_OBJECT (sink, "gst_poll_fd_has_error for %d", client->fd.fd);
if (gst_poll_fd_has_error (sink->fdset, &client->gfd)) {
GST_WARNING_OBJECT (sink, "gst_poll_fd_has_error for %d", client->gfd.fd);
mhclient->status = GST_CLIENT_STATUS_ERROR;
mhsinkclass->remove_client_link (mhsink, clients);
continue;
}
if (gst_poll_fd_can_read (sink->fdset, &client->fd)) {
if (gst_poll_fd_can_read (sink->fdset, &client->gfd)) {
/* handle client read */
if (!gst_multi_fd_sink_handle_client_read (sink, client)) {
mhsinkclass->remove_client_link (mhsink, clients);
continue;
}
}
if (gst_poll_fd_can_write (sink->fdset, &client->fd)) {
if (gst_poll_fd_can_write (sink->fdset, &client->gfd)) {
/* handle client write */
if (!gst_multi_fd_sink_handle_client_write (sink, client)) {
mhsinkclass->remove_client_link (mhsink, clients);
@ -1658,7 +1693,7 @@ gst_multi_fd_sink_get_property (GObject * object, guint prop_id, GValue * value,
g_value_set_boolean (value, multifdsink->handle_read);
break;
case PROP_NUM_FDS:
g_value_set_uint (value, g_hash_table_size (multifdsink->fd_hash));
g_value_set_uint (value, g_hash_table_size (multifdsink->handle_hash));
break;
default:
@ -1711,6 +1746,6 @@ gst_multi_fd_sink_stop_post (GstMultiHandleSink * mhsink)
gst_poll_free (mfsink->fdset);
mfsink->fdset = NULL;
}
g_hash_table_foreach_remove (mfsink->fd_hash, multifdsink_hash_remove,
g_hash_table_foreach_remove (mfsink->handle_hash, multifdsink_hash_remove,
mfsink);
}

View file

@ -52,7 +52,7 @@ typedef struct _GstMultiFdSinkClass GstMultiFdSinkClass;
typedef struct {
GstMultiHandleClient client;
GstPollFD fd;
GstPollFD gfd;
gboolean is_socket;
} GstTCPClient;
@ -66,7 +66,7 @@ struct _GstMultiFdSink {
GstMultiHandleSink element;
/*< private >*/
GHashTable *fd_hash; /* index on fd to client */
GHashTable *handle_hash; /* index on fd to client */
gint mode;
GstPoll *fdset;

View file

@ -130,6 +130,8 @@ typedef union
/* structure for a client
*/
typedef struct {
GstMultiSinkHandle handle;
gchar debug[30]; /* a debug string used in debug calls to
identify the client */
gint bufpos; /* position of this client in the global queue */
@ -278,6 +280,7 @@ struct _GstMultiHandleSinkClass {
GstBuffer *buffer);
int (*client_get_fd)
(GstMultiHandleClient *client);
void (*handle_debug) (GstMultiSinkHandle handle, gchar debug[30]);
GstStructure* (*get_stats) (GstMultiHandleSink *sink, GstMultiSinkHandle handle);

View file

@ -106,6 +106,8 @@
#include <gst/gst-i18n-plugin.h>
#include <string.h>
#include "gstmultisocketsink.h"
#include "gsttcp-marshal.h"
@ -156,6 +158,8 @@ static void gst_multi_socket_sink_queue_buffer (GstMultiHandleSink * mhsink,
static gboolean gst_multi_socket_sink_client_queue_buffer (GstMultiHandleSink *
mhsink, GstMultiHandleClient * mhclient, GstBuffer * buffer);
static int gst_multi_socket_sink_client_get_fd (GstMultiHandleClient * client);
static void gst_multi_socket_sink_handle_debug (GstMultiSinkHandle handle,
gchar debug[30]);
static void gst_multi_socket_sink_remove_client_link (GstMultiHandleSink * sink,
@ -358,6 +362,8 @@ gst_multi_socket_sink_class_init (GstMultiSocketSinkClass * klass)
GST_DEBUG_FUNCPTR (gst_multi_socket_sink_client_queue_buffer);
gstmultihandlesink_class->client_get_fd =
GST_DEBUG_FUNCPTR (gst_multi_socket_sink_client_get_fd);
gstmultihandlesink_class->handle_debug =
GST_DEBUG_FUNCPTR (gst_multi_socket_sink_handle_debug);
gstmultihandlesink_class->remove_client_link =
GST_DEBUG_FUNCPTR (gst_multi_socket_sink_remove_client_link);
@ -375,7 +381,7 @@ gst_multi_socket_sink_class_init (GstMultiSocketSinkClass * klass)
static void
gst_multi_socket_sink_init (GstMultiSocketSink * this)
{
this->socket_hash = g_hash_table_new (g_direct_hash, g_int_equal);
this->handle_hash = g_hash_table_new (g_direct_hash, g_int_equal);
this->cancellable = g_cancellable_new ();
}
@ -385,7 +391,7 @@ gst_multi_socket_sink_finalize (GObject * object)
{
GstMultiSocketSink *this = GST_MULTI_SOCKET_SINK (object);
g_hash_table_destroy (this->socket_hash);
g_hash_table_destroy (this->handle_hash);
if (this->cancellable) {
g_object_unref (this->cancellable);
this->cancellable = NULL;
@ -397,11 +403,16 @@ gst_multi_socket_sink_finalize (GObject * object)
static int
gst_multi_socket_sink_client_get_fd (GstMultiHandleClient * client)
{
GstSocketClient *msclient = (GstSocketClient *) client;
return g_socket_get_fd (msclient->handle.socket);
return g_socket_get_fd (client->handle.socket);
}
static void
gst_multi_socket_sink_handle_debug (GstMultiSinkHandle handle, gchar debug[30])
{
g_snprintf (debug, 30, "[socket %p]", handle.socket);
}
/* "add-full" signal implementation */
void
gst_multi_socket_sink_add_full (GstMultiSocketSink * sink,
@ -412,12 +423,17 @@ gst_multi_socket_sink_add_full (GstMultiSocketSink * sink,
GstMultiHandleClient *mhclient;
GList *clink;
GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
gchar debug[30];
GstMultiHandleSinkClass *mhsinkclass =
GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
// FIXME: remove assert
g_assert (G_IS_SOCKET (handle.socket));
mhsinkclass->handle_debug (handle, debug);
GST_DEBUG_OBJECT (sink, "%s adding client, sync_method %d, "
"min_format %d, min_value %" G_GUINT64_FORMAT
", max_format %d, max_value %" G_GUINT64_FORMAT, handle.socket,
", max_format %d, max_value %" G_GUINT64_FORMAT, debug,
sync_method, min_format, min_value, max_format, max_value);
/* do limits check if we can */
@ -430,8 +446,8 @@ gst_multi_socket_sink_add_full (GstMultiSocketSink * sink,
client = g_new0 (GstSocketClient, 1);
mhclient = (GstMultiHandleClient *) client;
gst_multi_handle_sink_client_init (mhclient, sync_method);
g_snprintf (mhclient->debug, 30, "[socket %p]", handle.socket);
client->handle.socket = G_SOCKET (g_object_ref (handle.socket));
strncpy (mhclient->debug, debug, 30);
mhclient->handle.socket = G_SOCKET (g_object_ref (handle.socket));
mhclient->burst_min_format = min_format;
mhclient->burst_min_value = min_value;
@ -441,13 +457,13 @@ gst_multi_socket_sink_add_full (GstMultiSocketSink * sink,
CLIENTS_LOCK (sink);
/* check the hash to find a duplicate fd */
clink = g_hash_table_lookup (sink->socket_hash, handle.socket);
clink = g_hash_table_lookup (sink->handle_hash, handle.socket);
if (clink != NULL)
goto duplicate;
/* we can add the fd now */
clink = mhsink->clients = g_list_prepend (mhsink->clients, client);
g_hash_table_insert (sink->socket_hash, handle.socket, clink);
g_hash_table_insert (sink->handle_hash, handle.socket, clink);
mhsink->clients_cookie++;
/* set the socket to non blocking */
@ -456,7 +472,7 @@ gst_multi_socket_sink_add_full (GstMultiSocketSink * sink,
/* we always read from a client */
if (sink->main_context) {
client->source =
g_socket_create_source (client->handle.socket,
g_socket_create_source (mhclient->handle.socket,
G_IO_IN | G_IO_OUT | G_IO_PRI | G_IO_ERR | G_IO_HUP, sink->cancellable);
g_source_set_callback (client->source,
(GSourceFunc) gst_multi_socket_sink_socket_condition,
@ -517,12 +533,14 @@ gst_multi_socket_sink_remove (GstMultiSocketSink * sink,
GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
GstMultiHandleSinkClass *mhsinkclass =
GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
gchar debug[30];
mhsinkclass->handle_debug (handle, debug);
// FIXME; how to vfunc this ?
GST_DEBUG_OBJECT (sink, "[socket %p] removing client", handle.socket);
GST_DEBUG_OBJECT (sink, "%s removing client", debug);
CLIENTS_LOCK (sink);
clink = g_hash_table_lookup (sink->socket_hash, handle.socket);
clink = g_hash_table_lookup (sink->handle_hash, handle.socket);
if (clink != NULL) {
GstSocketClient *client = clink->data;
GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
@ -537,8 +555,7 @@ gst_multi_socket_sink_remove (GstMultiSocketSink * sink,
mhclient->status = GST_CLIENT_STATUS_REMOVED;
mhsinkclass->remove_client_link (GST_MULTI_HANDLE_SINK (sink), clink);
} else {
GST_WARNING_OBJECT (sink, "[socket %p] no client with this socket found!",
handle.socket);
GST_WARNING_OBJECT (sink, "%s no client with this socket found!", debug);
}
done:
@ -551,11 +568,17 @@ gst_multi_socket_sink_remove_flush (GstMultiSocketSink * sink,
GstMultiSinkHandle handle)
{
GList *clink;
GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
GstMultiHandleSinkClass *mhsinkclass =
GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
gchar debug[30];
GST_DEBUG_OBJECT (sink, "[socket %p] flushing client", handle.socket);
mhsinkclass->handle_debug (handle, debug);
GST_DEBUG_OBJECT (sink, "%s flushing client", debug);
CLIENTS_LOCK (sink);
clink = g_hash_table_lookup (sink->socket_hash, handle.socket);
clink = g_hash_table_lookup (sink->handle_hash, handle.socket);
if (clink != NULL) {
GstSocketClient *client = clink->data;
GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
@ -563,7 +586,7 @@ gst_multi_socket_sink_remove_flush (GstMultiSocketSink * sink,
if (mhclient->status != GST_CLIENT_STATUS_OK) {
GST_INFO_OBJECT (sink,
"%s Client already disconnecting with status %d",
socket, mhclient->status);
mhclient->debug, mhclient->status);
goto done;
}
@ -575,7 +598,7 @@ gst_multi_socket_sink_remove_flush (GstMultiSocketSink * sink,
* it might have some buffers to flush in the ->sending queue. */
mhclient->status = GST_CLIENT_STATUS_FLUSHING;
} else {
GST_WARNING_OBJECT (sink, "%s no client with this fd found!", socket);
GST_WARNING_OBJECT (sink, "%s no client with this fd found!", debug);
}
done:
CLIENTS_UNLOCK (sink);
@ -590,9 +613,15 @@ gst_multi_socket_sink_get_stats (GstMultiSocketSink * sink,
GstSocketClient *client;
GstStructure *result = NULL;
GList *clink;
GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
GstMultiHandleSinkClass *mhsinkclass =
GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
gchar debug[30];
mhsinkclass->handle_debug (handle, debug);
CLIENTS_LOCK (sink);
clink = g_hash_table_lookup (sink->socket_hash, handle.socket);
clink = g_hash_table_lookup (sink->handle_hash, handle.socket);
if (clink == NULL)
goto noclient;
@ -629,7 +658,7 @@ noclient:
/* python doesn't like a NULL pointer yet */
if (result == NULL) {
GST_WARNING_OBJECT (sink, "%s no client with this found!", socket);
GST_WARNING_OBJECT (sink, "%s no client with this found!", debug);
result = gst_structure_new_empty ("multisocketsink-stats");
}
@ -716,7 +745,7 @@ gst_multi_socket_sink_remove_client_link (GstMultiHandleSink * sink,
CLIENTS_UNLOCK (sink);
g_signal_emit (G_OBJECT (sink),
gst_multi_socket_sink_signals[SIGNAL_CLIENT_REMOVED], 0, client->handle,
gst_multi_socket_sink_signals[SIGNAL_CLIENT_REMOVED], 0, mhclient->handle,
mhclient->status);
/* lock again before we remove the client completely */
@ -724,9 +753,9 @@ gst_multi_socket_sink_remove_client_link (GstMultiHandleSink * sink,
/* fd cannot be reused in the above signal callback so we can safely
* remove it from the hashtable here */
if (!g_hash_table_remove (mssink->socket_hash, client->handle.socket)) {
if (!g_hash_table_remove (mssink->handle_hash, mhclient->handle.socket)) {
GST_WARNING_OBJECT (sink,
"%s error removing client %p from hash", mhclient, client);
"%s error removing client %p from hash", mhclient->debug, client);
}
/* after releasing the lock above, the link could be invalid, more
* precisely, the next and prev pointers could point to invalid list
@ -737,16 +766,18 @@ gst_multi_socket_sink_remove_client_link (GstMultiHandleSink * sink,
sink->clients_cookie++;
if (fclass->removed)
fclass->removed (sink, client->handle);
fclass->removed (sink, mhclient->handle);
g_free (client);
CLIENTS_UNLOCK (sink);
/* and the fd is really gone now */
g_signal_emit (G_OBJECT (sink),
gst_multi_socket_sink_signals[SIGNAL_CLIENT_SOCKET_REMOVED], 0,
client->handle);
g_object_unref (client->handle.socket);
mhclient->handle);
g_assert (G_IS_SOCKET (mhclient->handle.socket));
g_object_unref (mhclient->handle.socket);
g_free (client);
CLIENTS_LOCK (sink);
}
@ -777,12 +808,12 @@ gst_multi_socket_sink_handle_client_read (GstMultiSocketSink * sink,
GST_DEBUG_OBJECT (sink, "%s client wants us to read", mhclient->debug);
navail = g_socket_get_available_bytes (client->handle.socket);
navail = g_socket_get_available_bytes (mhclient->handle.socket);
if (navail < 0)
break;
nread =
g_socket_receive (client->handle.socket, dummy, MIN (navail,
g_socket_receive (mhclient->handle.socket, dummy, MIN (navail,
sizeof (dummy)), sink->cancellable, &err);
if (first && nread == 0) {
/* client sent close, so remove it */
@ -1059,7 +1090,7 @@ gst_multi_socket_sink_handle_client_write (GstMultiSocketSink * sink,
/* try to write the complete buffer */
wrote =
g_socket_send (client->handle.socket,
g_socket_send (mhclient->handle.socket,
(gchar *) map.data + mhclient->bufoffset, maxsize, sink->cancellable,
&err);
gst_buffer_unmap (head, &map);
@ -1100,13 +1131,14 @@ gst_multi_socket_sink_handle_client_write (GstMultiSocketSink * sink,
/* ERRORS */
flushed:
{
GST_DEBUG_OBJECT (sink, "%s flushed, removing", socket);
GST_DEBUG_OBJECT (sink, "%s flushed, removing", mhclient->debug);
mhclient->status = GST_CLIENT_STATUS_REMOVED;
return FALSE;
}
connection_reset:
{
GST_DEBUG_OBJECT (sink, "%s connection reset by peer, removing", socket);
GST_DEBUG_OBJECT (sink, "%s connection reset by peer, removing",
mhclient->debug);
mhclient->status = GST_CLIENT_STATUS_CLOSED;
g_clear_error (&err);
return FALSE;
@ -1114,7 +1146,8 @@ connection_reset:
write_error:
{
GST_WARNING_OBJECT (sink,
"%s could not write, removing client: %s", socket, err->message);
"%s could not write, removing client: %s", mhclient->debug,
err->message);
g_clear_error (&err);
mhclient->status = GST_CLIENT_STATUS_ERROR;
return FALSE;
@ -1232,7 +1265,7 @@ restart:
* the fd_set changed */
if (!client->source) {
client->source =
g_socket_create_source (client->handle.socket,
g_socket_create_source (mhclient->handle.socket,
G_IO_IN | G_IO_OUT | G_IO_PRI | G_IO_ERR | G_IO_HUP,
sink->cancellable);
g_source_set_callback (client->source,
@ -1333,7 +1366,7 @@ gst_multi_socket_sink_socket_condition (GstMultiSinkHandle handle,
GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
CLIENTS_LOCK (sink);
clink = g_hash_table_lookup (sink->socket_hash, handle.socket);
clink = g_hash_table_lookup (sink->handle_hash, handle.socket);
if (clink == NULL) {
ret = FALSE;
goto done;
@ -1350,7 +1383,7 @@ gst_multi_socket_sink_socket_condition (GstMultiSinkHandle handle,
}
if ((condition & G_IO_ERR)) {
GST_WARNING_OBJECT (sink, "Socket %p has error", mhclient->debug);
GST_WARNING_OBJECT (sink, "%s has error", mhclient->debug);
mhclient->status = GST_CLIENT_STATUS_ERROR;
mhsinkclass->remove_client_link (mhsink, clink);
ret = FALSE;
@ -1469,7 +1502,7 @@ gst_multi_socket_sink_get_property (GObject * object, guint prop_id,
switch (prop_id) {
case PROP_NUM_SOCKETS:
g_value_set_uint (value,
g_hash_table_size (multisocketsink->socket_hash));
g_hash_table_size (multisocketsink->handle_hash));
break;
default:
@ -1490,13 +1523,13 @@ gst_multi_socket_sink_start_pre (GstMultiHandleSink * mhsink)
CLIENTS_LOCK (mssink);
for (clients = mhsink->clients; clients; clients = clients->next) {
GstSocketClient *client;
GstSocketClient *client = clients->data;
GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
client = clients->data;
if (client->source)
continue;
client->source =
g_socket_create_source (client->handle.socket,
g_socket_create_source (mhclient->handle.socket,
G_IO_IN | G_IO_OUT | G_IO_PRI | G_IO_ERR | G_IO_HUP,
mssink->cancellable);
g_source_set_callback (client->source,
@ -1535,7 +1568,7 @@ gst_multi_socket_sink_stop_post (GstMultiHandleSink * mhsink)
mssink->main_context = NULL;
}
g_hash_table_foreach_remove (mssink->socket_hash, multisocketsink_hash_remove,
g_hash_table_foreach_remove (mssink->handle_hash, multisocketsink_hash_remove,
mssink);
}

View file

@ -55,7 +55,6 @@ typedef struct _GstMultiSocketSinkClass GstMultiSocketSinkClass;
typedef struct {
GstMultiHandleClient client;
GstMultiSinkHandle handle;
GSource *source;
} GstSocketClient;
@ -68,7 +67,7 @@ struct _GstMultiSocketSink {
GstMultiHandleSink element;
/*< private >*/
GHashTable *socket_hash; /* index on socket to client */
GHashTable *handle_hash; /* index on socket to client */
GMainContext *main_context;
GCancellable *cancellable;