multihandlesink: introduce Handle union

This commit is contained in:
Thomas Vander Stichele 2012-01-27 21:28:05 +01:00
parent 14ac8bb585
commit 64d8ec6459
6 changed files with 228 additions and 220 deletions

View file

@ -457,7 +457,7 @@ gst_multi_fd_sink_client_get_fd (GstMultiHandleClient * client)
/* "add-full" signal implementation */
void
gst_multi_fd_sink_add_full (GstMultiFdSink * sink, int fd,
gst_multi_fd_sink_add_full (GstMultiFdSink * sink, GstMultiSinkHandle handle,
GstSyncMethod sync_method, GstFormat min_format, guint64 min_value,
GstFormat max_format, guint64 max_value)
{
@ -467,11 +467,13 @@ gst_multi_fd_sink_add_full (GstMultiFdSink * sink, int fd,
gint flags;
struct stat statbuf;
GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
// FIXME: convert to a function so we can vfunc this
int fd = handle.fd;
GST_DEBUG_OBJECT (sink, "[fd %5d] adding client, sync_method %d, "
GST_DEBUG_OBJECT (sink, "%s adding client, sync_method %d, "
"min_format %d, min_value %" G_GUINT64_FORMAT
", max_format %d, max_value %" G_GUINT64_FORMAT, fd, sync_method,
min_format, min_value, max_format, max_value);
", max_format %d, max_value %" G_GUINT64_FORMAT, mhclient->debug,
sync_method, min_format, min_value, max_format, max_value);
/* do limits check if we can */
if (min_format == max_format) {
@ -505,8 +507,8 @@ gst_multi_fd_sink_add_full (GstMultiFdSink * sink, int fd,
/* set the socket to non blocking */
if (fcntl (fd, F_SETFL, O_NONBLOCK) < 0) {
GST_ERROR_OBJECT (sink, "failed to make socket %d non-blocking: %s", fd,
g_strerror (errno));
GST_ERROR_OBJECT (sink, "failed to make socket %d non-blocking: %s",
mhclient->debug, g_strerror (errno));
}
/* we always read from a client */
@ -538,18 +540,18 @@ gst_multi_fd_sink_add_full (GstMultiFdSink * sink, int fd,
wrong_limits:
{
GST_WARNING_OBJECT (sink,
"[fd %5d] wrong values min =%" G_GUINT64_FORMAT ", max=%"
G_GUINT64_FORMAT ", unit %d specified when adding client", fd,
min_value, max_value, min_format);
"%s wrong values min =%" G_GUINT64_FORMAT ", max=%"
G_GUINT64_FORMAT ", unit %d specified when adding client",
mhclient->debug, min_value, max_value, min_format);
return;
}
duplicate:
{
mhclient->status = GST_CLIENT_STATUS_DUPLICATE;
CLIENTS_UNLOCK (sink);
GST_WARNING_OBJECT (sink, "[fd %5d] duplicate client found, refusing", fd);
GST_WARNING_OBJECT (sink, "%s duplicate client found, refusing", fd);
g_signal_emit (G_OBJECT (sink),
gst_multi_fd_sink_signals[SIGNAL_CLIENT_REMOVED], 0, fd,
gst_multi_fd_sink_signals[SIGNAL_CLIENT_REMOVED], 0, mhclient->debug,
mhclient->status);
g_free (client);
return;
@ -558,26 +560,27 @@ duplicate:
/* "add" signal implementation */
void
gst_multi_fd_sink_add (GstMultiFdSink * sink, int fd)
gst_multi_fd_sink_add (GstMultiFdSink * sink, GstMultiSinkHandle handle)
{
GstMultiHandleSink *mhsink;
GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
mhsink = GST_MULTI_HANDLE_SINK (sink);
gst_multi_fd_sink_add_full (sink, fd, mhsink->def_sync_method,
gst_multi_fd_sink_add_full (sink, handle, mhsink->def_sync_method,
mhsink->def_burst_format, mhsink->def_burst_value,
mhsink->def_burst_format, -1);
}
/* "remove" signal implementation */
void
gst_multi_fd_sink_remove (GstMultiFdSink * sink, int fd)
gst_multi_fd_sink_remove (GstMultiFdSink * sink, GstMultiSinkHandle handle)
{
GList *clink;
GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
GstMultiHandleSinkClass *mhsinkclass =
GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
// FIXME: convert to a function so we can vfunc this
int fd = handle.fd;
GST_DEBUG_OBJECT (sink, "[fd %5d] removing client", fd);
GST_DEBUG_OBJECT (sink, "%s removing client", fd);
CLIENTS_LOCK (sink);
clink = g_hash_table_lookup (sink->fd_hash, &fd);
@ -587,7 +590,7 @@ gst_multi_fd_sink_remove (GstMultiFdSink * sink, int fd)
if (mhclient->status != GST_CLIENT_STATUS_OK) {
GST_INFO_OBJECT (sink,
"[fd %5d] Client already disconnecting with status %d",
"%s Client already disconnecting with status %d",
fd, mhclient->status);
goto done;
}
@ -597,7 +600,7 @@ gst_multi_fd_sink_remove (GstMultiFdSink * sink, int fd)
// FIXME: specific poll
gst_poll_restart (sink->fdset);
} else {
GST_WARNING_OBJECT (sink, "[fd %5d] no client with this fd found!", fd);
GST_WARNING_OBJECT (sink, "%s no client with this fd found!", fd);
}
done:
@ -606,11 +609,14 @@ done:
/* "remove-flush" signal implementation */
void
gst_multi_fd_sink_remove_flush (GstMultiFdSink * sink, int fd)
gst_multi_fd_sink_remove_flush (GstMultiFdSink * sink,
GstMultiSinkHandle handle)
{
GList *clink;
// FIXME: convert to a function so we can vfunc this
int fd = handle.fd;
GST_DEBUG_OBJECT (sink, "[fd %5d] flushing client", fd);
GST_DEBUG_OBJECT (sink, "%s flushing client", fd);
CLIENTS_LOCK (sink);
clink = g_hash_table_lookup (sink->fd_hash, &fd);
@ -620,7 +626,7 @@ gst_multi_fd_sink_remove_flush (GstMultiFdSink * sink, int fd)
if (mhclient->status != GST_CLIENT_STATUS_OK) {
GST_INFO_OBJECT (sink,
"[fd %5d] Client already disconnecting with status %d",
"%s Client already disconnecting with status %d",
fd, mhclient->status);
goto done;
}
@ -633,7 +639,7 @@ gst_multi_fd_sink_remove_flush (GstMultiFdSink * sink, int fd)
* it might have some buffers to flush in the ->sending queue. */
mhclient->status = GST_CLIENT_STATUS_FLUSHING;
} else {
GST_WARNING_OBJECT (sink, "[fd %5d] no client with this fd found!", fd);
GST_WARNING_OBJECT (sink, "%s no client with this fd found!", fd);
}
done:
CLIENTS_UNLOCK (sink);
@ -661,11 +667,13 @@ gst_multi_fd_sink_clear_post (GstMultiHandleSink * mhsink)
* guint64 : timestamp of the last buffer sent (in nanoseconds)
*/
GValueArray *
gst_multi_fd_sink_get_stats (GstMultiFdSink * sink, int fd)
gst_multi_fd_sink_get_stats (GstMultiFdSink * sink, GstMultiSinkHandle handle)
{
GstTCPClient *client;
GValueArray *result = NULL;
GList *clink;
// FIXME: convert to a function so we can vfunc this
int fd = handle.fd;
CLIENTS_LOCK (sink);
clink = g_hash_table_lookup (sink->fd_hash, &fd);
@ -727,7 +735,7 @@ noclient:
/* python doesn't like a NULL pointer yet */
if (result == NULL) {
GST_WARNING_OBJECT (sink, "[fd %5d] no client with this found!", fd);
GST_WARNING_OBJECT (sink, "%s no client with this found!", fd);
result = g_value_array_new (0);
}
@ -742,16 +750,12 @@ noclient:
static void
gst_multi_fd_sink_remove_client_link (GstMultiHandleSink * sink, GList * link)
{
int fd;
GTimeVal now;
GstTCPClient *client = (GstTCPClient *) link->data;
GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
GstMultiFdSink *mfsink = GST_MULTI_FD_SINK (sink);
GstMultiFdSinkClass *fclass;
fclass = GST_MULTI_FD_SINK_GET_CLASS (sink);
fd = client->fd.fd;
GstMultiFdSinkClass *fclass = GST_MULTI_FD_SINK_GET_CLASS (sink);
int fd = client->fd.fd;
if (mhclient->currently_removing) {
GST_WARNING_OBJECT (sink, "%s client is already being removed",
@ -822,7 +826,7 @@ gst_multi_fd_sink_remove_client_link (GstMultiHandleSink * sink, GList * link)
* remove it from the hashtable here */
if (!g_hash_table_remove (mfsink->fd_hash, &client->fd.fd)) {
GST_WARNING_OBJECT (sink,
"[fd %5d] error removing client %p from hash", client->fd.fd, client);
"%s error removing client %p from hash", client->fd.fd, client);
}
/* after releasing the lock above, the link could be invalid, more
* precisely, the next and prev pointers could point to invalid list
@ -833,7 +837,7 @@ gst_multi_fd_sink_remove_client_link (GstMultiHandleSink * sink, GList * link)
sink->clients_cookie++;
if (fclass->removed)
fclass->removed (mfsink, client->fd.fd);
fclass->removed (mfsink, (GstMultiSinkHandle) client->fd.fd);
g_free (client);
CLIENTS_UNLOCK (sink);
@ -861,18 +865,18 @@ gst_multi_fd_sink_handle_client_read (GstMultiFdSink * sink,
if (ioctl (fd, FIONREAD, &avail) < 0)
goto ioctl_failed;
GST_DEBUG_OBJECT (sink, "[fd %5d] select reports client read of %d bytes",
GST_DEBUG_OBJECT (sink, "%s select reports client read of %d bytes",
fd, avail);
ret = TRUE;
if (avail == 0) {
/* client sent close, so remove it */
GST_DEBUG_OBJECT (sink, "[fd %5d] client asked for close, removing", fd);
GST_DEBUG_OBJECT (sink, "%s client asked for close, removing", fd);
mhclient->status = GST_CLIENT_STATUS_CLOSED;
ret = FALSE;
} else if (avail < 0) {
GST_WARNING_OBJECT (sink, "[fd %5d] avail < 0, removing", fd);
GST_WARNING_OBJECT (sink, "%s avail < 0, removing", fd);
mhclient->status = GST_CLIENT_STATUS_ERROR;
ret = FALSE;
} else {
@ -886,18 +890,18 @@ gst_multi_fd_sink_handle_client_read (GstMultiFdSink * sink,
/* this is the maximum we can read */
gint to_read = MIN (avail, 512);
GST_DEBUG_OBJECT (sink, "[fd %5d] client wants us to read %d bytes",
GST_DEBUG_OBJECT (sink, "%s client wants us to read %d bytes",
fd, to_read);
nread = read (fd, dummy, to_read);
if (nread < -1) {
GST_WARNING_OBJECT (sink, "[fd %5d] could not read %d bytes: %s (%d)",
GST_WARNING_OBJECT (sink, "%s could not read %d bytes: %s (%d)",
fd, to_read, g_strerror (errno), errno);
mhclient->status = GST_CLIENT_STATUS_ERROR;
ret = FALSE;
break;
} else if (nread == 0) {
GST_WARNING_OBJECT (sink, "[fd %5d] 0 bytes in read, removing", fd);
GST_WARNING_OBJECT (sink, "%s 0 bytes in read, removing", fd);
mhclient->status = GST_CLIENT_STATUS_ERROR;
ret = FALSE;
break;
@ -911,7 +915,7 @@ gst_multi_fd_sink_handle_client_read (GstMultiFdSink * sink,
/* ERRORS */
ioctl_failed:
{
GST_WARNING_OBJECT (sink, "[fd %5d] ioctl failed: %s (%d)",
GST_WARNING_OBJECT (sink, "%s ioctl failed: %s (%d)",
fd, g_strerror (errno), errno);
mhclient->status = GST_CLIENT_STATUS_ERROR;
return FALSE;
@ -929,7 +933,6 @@ gst_multi_fd_sink_client_queue_buffer (GstMultiHandleSink * mhsink,
gboolean send_streamheader = FALSE;
GstStructure *s;
GstMultiFdSink *sink = GST_MULTI_FD_SINK (mhsink);
GstTCPClient *client = (GstTCPClient *) mhclient;
/* before we queue the buffer, we check if we need to queue streamheader
* buffers (because it's a new client, or because they changed) */
@ -937,8 +940,8 @@ gst_multi_fd_sink_client_queue_buffer (GstMultiHandleSink * mhsink,
if (!mhclient->caps) {
GST_DEBUG_OBJECT (sink,
"[fd %5d] no previous caps for this client, send streamheader",
client->fd.fd);
"%s no previous caps for this client, send streamheader",
mhclient->debug);
send_streamheader = TRUE;
mhclient->caps = gst_caps_ref (caps);
} else {
@ -951,23 +954,23 @@ gst_multi_fd_sink_client_queue_buffer (GstMultiHandleSink * mhsink,
if (!gst_structure_has_field (s, "streamheader")) {
/* no new streamheader, so nothing new to send */
GST_DEBUG_OBJECT (sink,
"[fd %5d] new caps do not have streamheader, not sending",
client->fd.fd);
"%s new caps do not have streamheader, not sending",
mhclient->debug);
} else {
/* there is a new streamheader */
s = gst_caps_get_structure (mhclient->caps, 0);
if (!gst_structure_has_field (s, "streamheader")) {
/* no previous streamheader, so send the new one */
GST_DEBUG_OBJECT (sink,
"[fd %5d] previous caps did not have streamheader, sending",
client->fd.fd);
"%s previous caps did not have streamheader, sending",
mhclient->debug);
send_streamheader = TRUE;
} else {
/* both old and new caps have streamheader set */
if (!mhsink->resend_streamheader) {
GST_DEBUG_OBJECT (sink,
"[fd %5d] asked to not resend the streamheader, not sending",
client->fd.fd);
"%s asked to not resend the streamheader, not sending",
mhclient->debug);
send_streamheader = FALSE;
} else {
sh1 = gst_structure_get_value (s, "streamheader");
@ -975,8 +978,8 @@ gst_multi_fd_sink_client_queue_buffer (GstMultiHandleSink * mhsink,
sh2 = gst_structure_get_value (s, "streamheader");
if (gst_value_compare (sh1, sh2) != GST_VALUE_EQUAL) {
GST_DEBUG_OBJECT (sink,
"[fd %5d] new streamheader different from old, sending",
client->fd.fd);
"%s new streamheader different from old, sending",
mhclient->debug);
send_streamheader = TRUE;
}
}
@ -994,16 +997,16 @@ gst_multi_fd_sink_client_queue_buffer (GstMultiHandleSink * mhsink,
int i;
GST_LOG_OBJECT (sink,
"[fd %5d] sending streamheader from caps %" GST_PTR_FORMAT,
client->fd.fd, caps);
"%s sending streamheader from caps %" GST_PTR_FORMAT,
mhclient->debug, caps);
s = gst_caps_get_structure (caps, 0);
if (!gst_structure_has_field (s, "streamheader")) {
GST_DEBUG_OBJECT (sink,
"[fd %5d] no new streamheader, so nothing to send", client->fd.fd);
"%s no new streamheader, so nothing to send", mhclient->debug);
} else {
GST_LOG_OBJECT (sink,
"[fd %5d] sending streamheader from caps %" GST_PTR_FORMAT,
client->fd.fd, caps);
"%s sending streamheader from caps %" GST_PTR_FORMAT,
mhclient->debug, caps);
sh = gst_structure_get_value (s, "streamheader");
g_assert (G_VALUE_TYPE (sh) == GST_TYPE_ARRAY);
buffers = g_value_peek_pointer (sh);
@ -1016,8 +1019,8 @@ gst_multi_fd_sink_client_queue_buffer (GstMultiHandleSink * mhsink,
g_assert (G_VALUE_TYPE (bufval) == GST_TYPE_BUFFER);
buffer = g_value_peek_pointer (bufval);
GST_DEBUG_OBJECT (sink,
"[fd %5d] queueing streamheader buffer of length %" G_GSIZE_FORMAT,
client->fd.fd, gst_buffer_get_size (buffer));
"%s queueing streamheader buffer of length %" G_GSIZE_FORMAT,
mhclient->debug, gst_buffer_get_size (buffer));
gst_buffer_ref (buffer);
mhclient->sending = g_slist_append (mhclient->sending, buffer);
@ -1028,8 +1031,8 @@ gst_multi_fd_sink_client_queue_buffer (GstMultiHandleSink * mhsink,
gst_caps_unref (caps);
caps = NULL;
GST_LOG_OBJECT (sink, "[fd %5d] queueing buffer of length %" G_GSIZE_FORMAT,
client->fd.fd, gst_buffer_get_size (buffer));
GST_LOG_OBJECT (sink, "%s queueing buffer of length %" G_GSIZE_FORMAT,
mhclient->debug, gst_buffer_get_size (buffer));
gst_buffer_ref (buffer);
mhclient->sending = g_slist_append (mhclient->sending, buffer);
@ -1063,7 +1066,7 @@ static gboolean
gst_multi_fd_sink_handle_client_write (GstMultiFdSink * sink,
GstTCPClient * client)
{
int fd = client->fd.fd;
GstMultiSinkHandle handle = (GstMultiSinkHandle) client->fd.fd;
gboolean more;
gboolean flushing;
GstClockTime now;
@ -1072,7 +1075,7 @@ gst_multi_fd_sink_handle_client_write (GstMultiFdSink * sink,
GstMultiHandleSinkClass *mhsinkclass =
GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
int fd = handle.fd;
g_get_current_time (&nowtv);
now = GST_TIMEVAL_TO_TIME (nowtv);
@ -1137,8 +1140,8 @@ gst_multi_fd_sink_handle_client_write (GstMultiFdSink * sink,
if (mhclient->flushcount != -1)
mhclient->flushcount--;
GST_LOG_OBJECT (sink, "[fd %5d] client %p at position %d",
fd, client, mhclient->bufpos);
GST_LOG_OBJECT (sink, "%s client %p at position %d",
mhclient->debug, client, mhclient->bufpos);
/* queueing a buffer will ref it */
mhsinkclass->client_queue_buffer (mhsink, mhclient, buf);
@ -1191,7 +1194,8 @@ gst_multi_fd_sink_handle_client_write (GstMultiFdSink * sink,
/* partial write means that the client cannot read more and we should
* stop sending more */
GST_LOG_OBJECT (sink,
"partial write on %d of %" G_GSSIZE_FORMAT " bytes", fd, wrote);
"partial write on %d of %" G_GSSIZE_FORMAT " bytes",
mhclient->debug, wrote);
mhclient->bufoffset += wrote;
more = FALSE;
} else {
@ -1214,20 +1218,20 @@ gst_multi_fd_sink_handle_client_write (GstMultiFdSink * sink,
/* ERRORS */
flushed:
{
GST_DEBUG_OBJECT (sink, "[fd %5d] flushed, removing", fd);
GST_DEBUG_OBJECT (sink, "%s flushed, removing", fd);
mhclient->status = GST_CLIENT_STATUS_REMOVED;
return FALSE;
}
connection_reset:
{
GST_DEBUG_OBJECT (sink, "[fd %5d] connection reset by peer, removing", fd);
GST_DEBUG_OBJECT (sink, "%s connection reset by peer, removing", fd);
mhclient->status = GST_CLIENT_STATUS_CLOSED;
return FALSE;
}
write_error:
{
GST_WARNING_OBJECT (sink,
"[fd %5d] could not write, removing client: %s (%d)", fd,
"%s could not write, removing client: %s (%d)", mhclient->debug,
g_strerror (errno), errno);
mhclient->status = GST_CLIENT_STATUS_ERROR;
return FALSE;
@ -1307,8 +1311,8 @@ restart:
next = g_list_next (clients);
mhclient->bufpos++;
GST_LOG_OBJECT (sink, "[fd %5d] client %p at position %d",
client->fd.fd, client, mhclient->bufpos);
GST_LOG_OBJECT (sink, "%s client %p at position %d",
mhclient->debug, client, mhclient->bufpos);
/* check soft max if needed, recover client */
if (soft_max_buffers > 0 && mhclient->bufpos >= soft_max_buffers) {
gint newpos;
@ -1318,12 +1322,11 @@ restart:
mhclient->dropped_buffers += mhclient->bufpos - newpos;
mhclient->bufpos = newpos;
mhclient->discont = TRUE;
GST_INFO_OBJECT (sink, "[fd %5d] client %p position reset to %d",
client->fd.fd, client, mhclient->bufpos);
GST_INFO_OBJECT (sink, "%s client %p position reset to %d",
mhclient->debug, client, mhclient->bufpos);
} else {
GST_INFO_OBJECT (sink,
"[fd %5d] client %p not recovering position",
client->fd.fd, client);
"%s client %p not recovering position", mhclient->debug, client);
}
}
/* check hard max and timeout, remove client */
@ -1331,8 +1334,8 @@ restart:
(mhsink->timeout > 0
&& now - mhclient->last_activity_time > mhsink->timeout)) {
/* remove client */
GST_WARNING_OBJECT (sink, "[fd %5d] client %p is too slow, removing",
client->fd.fd, client);
GST_WARNING_OBJECT (sink, "%s client %p is too slow, removing",
mhclient->debug, client);
/* remove the client, the fd set will be cleared and the select thread
* will be signaled */
mhclient->status = GST_CLIENT_STATUS_SLOW;
@ -1446,6 +1449,7 @@ gst_multi_fd_sink_handle_clients (GstMultiFdSink * sink)
GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
GstMultiHandleSinkClass *mhsinkclass =
GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
int fd;
fclass = GST_MULTI_FD_SINK_GET_CLASS (sink);
@ -1500,7 +1504,6 @@ gst_multi_fd_sink_handle_clients (GstMultiFdSink * sink)
for (clients = mhsink->clients; clients; clients = next) {
GstTCPClient *client;
GstMultiHandleClient *mhclient;
int fd;
long flags;
int res;

View file

@ -80,35 +80,35 @@ struct _GstMultiFdSinkClass {
GstMultiHandleSinkClass parent_class;
/* element methods */
void (*add) (GstMultiFdSink *sink, int fd);
void (*add_full) (GstMultiFdSink *sink, int fd, GstSyncMethod sync,
void (*add) (GstMultiFdSink *sink, GstMultiSinkHandle handle);
void (*add_full) (GstMultiFdSink *sink, GstMultiSinkHandle handle, GstSyncMethod sync,
GstFormat format, guint64 value,
GstFormat max_format, guint64 max_value);
void (*remove) (GstMultiFdSink *sink, int fd);
void (*remove_flush) (GstMultiFdSink *sink, int fd);
void (*remove) (GstMultiFdSink *sink, GstMultiSinkHandle handle);
void (*remove_flush) (GstMultiFdSink *sink, GstMultiSinkHandle handle);
void (*clear) (GstMultiFdSink *sink);
GValueArray* (*get_stats) (GstMultiFdSink *sink, int fd);
GValueArray* (*get_stats) (GstMultiFdSink *sink, GstMultiSinkHandle handle);
/* vtable */
gboolean (*wait) (GstMultiFdSink *sink, GstPoll *set);
void (*removed) (GstMultiFdSink *sink, int fd);
void (*removed) (GstMultiFdSink *sink, GstMultiSinkHandle handle);
/* signals */
void (*client_added) (GstElement *element, gint fd);
void (*client_removed) (GstElement *element, gint fd, GstClientStatus status);
void (*client_fd_removed) (GstElement *element, gint fd);
void (*client_added) (GstElement *element, GstMultiSinkHandle handle);
void (*client_removed) (GstElement *element, GstMultiSinkHandle handle, GstClientStatus status);
void (*client_fd_removed) (GstElement *element, GstMultiSinkHandle handle);
};
GType gst_multi_fd_sink_get_type (void);
void gst_multi_fd_sink_add (GstMultiFdSink *sink, int fd);
void gst_multi_fd_sink_add_full (GstMultiFdSink *sink, int fd, GstSyncMethod sync,
void gst_multi_fd_sink_add (GstMultiFdSink *sink, GstMultiSinkHandle handle);
void gst_multi_fd_sink_add_full (GstMultiFdSink *sink, GstMultiSinkHandle handle, GstSyncMethod sync,
GstFormat min_format, guint64 min_value,
GstFormat max_format, guint64 max_value);
void gst_multi_fd_sink_remove (GstMultiFdSink *sink, int fd);
void gst_multi_fd_sink_remove_flush (GstMultiFdSink *sink, int fd);
void gst_multi_fd_sink_remove (GstMultiFdSink *sink, GstMultiSinkHandle handle);
void gst_multi_fd_sink_remove_flush (GstMultiFdSink *sink, GstMultiSinkHandle handle);
void gst_multi_fd_sink_clear (GstMultiHandleSink *sink);
GValueArray* gst_multi_fd_sink_get_stats (GstMultiFdSink *sink, int fd);
GValueArray* gst_multi_fd_sink_get_stats (GstMultiFdSink *sink, GstMultiSinkHandle handle);
G_END_DECLS

View file

@ -120,6 +120,13 @@ typedef enum
GST_CLIENT_STATUS_FLUSHING = 6
} GstClientStatus;
// FIXME: is it better to use GSocket * or a gpointer here ?
typedef union
{
int fd;
GSocket *socket;
} GstMultiSinkHandle;
/* structure for a client
*/
typedef struct {
@ -251,12 +258,12 @@ struct _GstMultiHandleSinkClass {
GstBaseSinkClass parent_class;
/* element methods */
void (*add) (GstMultiHandleSink *sink, GSocket *socket);
void (*add_full) (GstMultiHandleSink *sink, GSocket *socket, GstSyncMethod sync,
void (*add) (GstMultiHandleSink *sink, GstMultiSinkHandle handle);
void (*add_full) (GstMultiHandleSink *sink, GstMultiSinkHandle handle, GstSyncMethod sync,
GstFormat format, guint64 value,
GstFormat max_format, guint64 max_value);
void (*remove) (GstMultiHandleSink *sink, GSocket *socket);
void (*remove_flush) (GstMultiHandleSink *sink, GSocket *socket);
void (*remove) (GstMultiHandleSink *sink, GstMultiSinkHandle handle);
void (*remove_flush) (GstMultiHandleSink *sink, GstMultiSinkHandle handle);
void (*clear) (GstMultiHandleSink *sink);
void (*clear_post) (GstMultiHandleSink *sink);
void (*stop_pre) (GstMultiHandleSink *sink);
@ -273,18 +280,18 @@ struct _GstMultiHandleSinkClass {
(GstMultiHandleClient *client);
GstStructure* (*get_stats) (GstMultiHandleSink *sink, GSocket *socket);
GstStructure* (*get_stats) (GstMultiHandleSink *sink, GstMultiSinkHandle handle);
void (*remove_client_link) (GstMultiHandleSink * sink, GList * link);
/* vtable */
gboolean (*init) (GstMultiHandleSink *sink);
gboolean (*close) (GstMultiHandleSink *sink);
void (*removed) (GstMultiHandleSink *sink, GSocket *socket);
void (*removed) (GstMultiHandleSink *sink, GstMultiSinkHandle handle);
/* signals */
void (*client_added) (GstElement *element, GSocket *socket);
void (*client_removed) (GstElement *element, GSocket *socket, GstClientStatus status);
void (*client_socket_removed) (GstElement *element, GSocket *socket);
void (*client_added) (GstElement *element, GstMultiSinkHandle handle);
void (*client_removed) (GstElement *element, GstMultiSinkHandle handle, GstClientStatus status);
void (*client_socket_removed) (GstElement *element, GstMultiSinkHandle handle);
};
GType gst_multi_handle_sink_get_type (void);

View file

@ -160,8 +160,8 @@ static int gst_multi_socket_sink_client_get_fd (GstMultiHandleClient * client);
static void gst_multi_socket_sink_remove_client_link (GstMultiHandleSink * sink,
GList * link);
static gboolean gst_multi_socket_sink_socket_condition (GSocket * socket,
GIOCondition condition, GstMultiSocketSink * sink);
static gboolean gst_multi_socket_sink_socket_condition (GstMultiSinkHandle
handle, GIOCondition condition, GstMultiSocketSink * sink);
static gboolean gst_multi_socket_sink_unlock (GstBaseSink * bsink);
static gboolean gst_multi_socket_sink_unlock_stop (GstBaseSink * bsink);
@ -399,23 +399,25 @@ gst_multi_socket_sink_client_get_fd (GstMultiHandleClient * client)
{
GstSocketClient *msclient = (GstSocketClient *) client;
return g_socket_get_fd (msclient->socket);
return g_socket_get_fd (msclient->handle.socket);
}
/* "add-full" signal implementation */
void
gst_multi_socket_sink_add_full (GstMultiSocketSink * sink, GSocket * socket,
GstSyncMethod sync_method, GstFormat min_format, guint64 min_value,
GstFormat max_format, guint64 max_value)
gst_multi_socket_sink_add_full (GstMultiSocketSink * sink,
GstMultiSinkHandle handle, GstSyncMethod sync_method, GstFormat min_format,
guint64 min_value, GstFormat max_format, guint64 max_value)
{
GstSocketClient *client;
GstMultiHandleClient *mhclient;
GList *clink;
GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
GST_DEBUG_OBJECT (sink, "[socket %p] adding client, sync_method %d, "
g_assert (G_IS_SOCKET (handle.socket));
GST_DEBUG_OBJECT (sink, "%s adding client, sync_method %d, "
"min_format %d, min_value %" G_GUINT64_FORMAT
", max_format %d, max_value %" G_GUINT64_FORMAT, socket,
", max_format %d, max_value %" G_GUINT64_FORMAT, handle.socket,
sync_method, min_format, min_value, max_format, max_value);
/* do limits check if we can */
@ -428,8 +430,8 @@ gst_multi_socket_sink_add_full (GstMultiSocketSink * sink, GSocket * socket,
client = g_new0 (GstSocketClient, 1);
mhclient = (GstMultiHandleClient *) client;
gst_multi_handle_sink_client_init (mhclient, sync_method);
g_snprintf (mhclient->debug, 30, "[socket %p]", socket);
client->socket = G_SOCKET (g_object_ref (socket));
g_snprintf (mhclient->debug, 30, "[socket %p]", handle.socket);
client->handle.socket = G_SOCKET (g_object_ref (handle.socket));
mhclient->burst_min_format = min_format;
mhclient->burst_min_value = min_value;
@ -439,22 +441,22 @@ gst_multi_socket_sink_add_full (GstMultiSocketSink * sink, GSocket * socket,
CLIENTS_LOCK (sink);
/* check the hash to find a duplicate fd */
clink = g_hash_table_lookup (sink->socket_hash, socket);
clink = g_hash_table_lookup (sink->socket_hash, handle.socket);
if (clink != NULL)
goto duplicate;
/* we can add the fd now */
clink = mhsink->clients = g_list_prepend (mhsink->clients, client);
g_hash_table_insert (sink->socket_hash, socket, clink);
g_hash_table_insert (sink->socket_hash, handle.socket, clink);
mhsink->clients_cookie++;
/* set the socket to non blocking */
g_socket_set_blocking (socket, FALSE);
g_socket_set_blocking (handle.socket, FALSE);
/* we always read from a client */
if (sink->main_context) {
client->source =
g_socket_create_source (client->socket,
g_socket_create_source (client->handle.socket,
G_IO_IN | G_IO_OUT | G_IO_PRI | G_IO_ERR | G_IO_HUP, sink->cancellable);
g_source_set_callback (client->source,
(GSourceFunc) gst_multi_socket_sink_socket_condition,
@ -467,7 +469,7 @@ gst_multi_socket_sink_add_full (GstMultiSocketSink * sink, GSocket * socket,
CLIENTS_UNLOCK (sink);
g_signal_emit (G_OBJECT (sink),
gst_multi_socket_sink_signals[SIGNAL_CLIENT_ADDED], 0, socket);
gst_multi_socket_sink_signals[SIGNAL_CLIENT_ADDED], 0, handle);
return;
@ -475,19 +477,19 @@ gst_multi_socket_sink_add_full (GstMultiSocketSink * sink, GSocket * socket,
wrong_limits:
{
GST_WARNING_OBJECT (sink,
"[socket %p] wrong values min =%" G_GUINT64_FORMAT ", max=%"
G_GUINT64_FORMAT ", format %d specified when adding client", socket,
min_value, max_value, min_format);
"%s wrong values min =%" G_GUINT64_FORMAT ", max=%"
G_GUINT64_FORMAT ", format %d specified when adding client",
mhclient->debug, min_value, max_value, min_format);
return;
}
duplicate:
{
mhclient->status = GST_CLIENT_STATUS_DUPLICATE;
CLIENTS_UNLOCK (sink);
GST_WARNING_OBJECT (sink, "[socket %p] duplicate client found, refusing",
socket);
GST_WARNING_OBJECT (sink, "%s duplicate client found, refusing",
mhclient->debug);
g_signal_emit (G_OBJECT (sink),
gst_multi_socket_sink_signals[SIGNAL_CLIENT_REMOVED], 0, socket,
gst_multi_socket_sink_signals[SIGNAL_CLIENT_REMOVED], 0, handle,
mhclient->status);
g_free (client);
return;
@ -496,37 +498,39 @@ duplicate:
/* "add" signal implementation */
void
gst_multi_socket_sink_add (GstMultiSocketSink * sink, GSocket * socket)
gst_multi_socket_sink_add (GstMultiSocketSink * sink, GstMultiSinkHandle handle)
{
GstMultiHandleSink *mhsink;
mhsink = GST_MULTI_HANDLE_SINK (sink);
gst_multi_socket_sink_add_full (sink, socket, mhsink->def_sync_method,
gst_multi_socket_sink_add_full (sink, handle, mhsink->def_sync_method,
mhsink->def_burst_format, mhsink->def_burst_value,
mhsink->def_burst_format, -1);
}
/* "remove" signal implementation */
void
gst_multi_socket_sink_remove (GstMultiSocketSink * sink, GSocket * socket)
gst_multi_socket_sink_remove (GstMultiSocketSink * sink,
GstMultiSinkHandle handle)
{
GList *clink;
GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
GstMultiHandleSinkClass *mhsinkclass =
GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
GST_DEBUG_OBJECT (sink, "[socket %p] removing client", socket);
// FIXME; how to vfunc this ?
GST_DEBUG_OBJECT (sink, "[socket %p] removing client", handle.socket);
CLIENTS_LOCK (sink);
clink = g_hash_table_lookup (sink->socket_hash, socket);
clink = g_hash_table_lookup (sink->socket_hash, handle.socket);
if (clink != NULL) {
GstSocketClient *client = clink->data;
GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
if (mhclient->status != GST_CLIENT_STATUS_OK) {
GST_INFO_OBJECT (sink,
"[socket %p] Client already disconnecting with status %d",
socket, mhclient->status);
"%s Client already disconnecting with status %d",
mhclient->debug, mhclient->status);
goto done;
}
@ -534,7 +538,7 @@ gst_multi_socket_sink_remove (GstMultiSocketSink * sink, GSocket * socket)
mhsinkclass->remove_client_link (GST_MULTI_HANDLE_SINK (sink), clink);
} else {
GST_WARNING_OBJECT (sink, "[socket %p] no client with this socket found!",
socket);
handle.socket);
}
done:
@ -543,21 +547,22 @@ done:
/* "remove-flush" signal implementation */
void
gst_multi_socket_sink_remove_flush (GstMultiSocketSink * sink, GSocket * socket)
gst_multi_socket_sink_remove_flush (GstMultiSocketSink * sink,
GstMultiSinkHandle handle)
{
GList *clink;
GST_DEBUG_OBJECT (sink, "[socket %p] flushing client", socket);
GST_DEBUG_OBJECT (sink, "[socket %p] flushing client", handle.socket);
CLIENTS_LOCK (sink);
clink = g_hash_table_lookup (sink->socket_hash, socket);
clink = g_hash_table_lookup (sink->socket_hash, handle.socket);
if (clink != NULL) {
GstSocketClient *client = clink->data;
GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
if (mhclient->status != GST_CLIENT_STATUS_OK) {
GST_INFO_OBJECT (sink,
"[socket %p] Client already disconnecting with status %d",
"%s Client already disconnecting with status %d",
socket, mhclient->status);
goto done;
}
@ -570,8 +575,7 @@ gst_multi_socket_sink_remove_flush (GstMultiSocketSink * sink, GSocket * socket)
* it might have some buffers to flush in the ->sending queue. */
mhclient->status = GST_CLIENT_STATUS_FLUSHING;
} else {
GST_WARNING_OBJECT (sink, "[socket %p] no client with this fd found!",
socket);
GST_WARNING_OBJECT (sink, "%s no client with this fd found!", socket);
}
done:
CLIENTS_UNLOCK (sink);
@ -580,14 +584,15 @@ done:
/* "get-stats" signal implementation
*/
GstStructure *
gst_multi_socket_sink_get_stats (GstMultiSocketSink * sink, GSocket * socket)
gst_multi_socket_sink_get_stats (GstMultiSocketSink * sink,
GstMultiSinkHandle handle)
{
GstSocketClient *client;
GstStructure *result = NULL;
GList *clink;
CLIENTS_LOCK (sink);
clink = g_hash_table_lookup (sink->socket_hash, socket);
clink = g_hash_table_lookup (sink->socket_hash, handle.socket);
if (clink == NULL)
goto noclient;
@ -624,7 +629,7 @@ noclient:
/* python doesn't like a NULL pointer yet */
if (result == NULL) {
GST_WARNING_OBJECT (sink, "[socket %p] no client with this found!", socket);
GST_WARNING_OBJECT (sink, "%s no client with this found!", socket);
result = gst_structure_new_empty ("multisocketsink-stats");
}
@ -640,7 +645,6 @@ static void
gst_multi_socket_sink_remove_client_link (GstMultiHandleSink * sink,
GList * link)
{
GSocket *socket;
GTimeVal now;
GstSocketClient *client = link->data;
GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
@ -649,8 +653,6 @@ gst_multi_socket_sink_remove_client_link (GstMultiHandleSink * sink,
fclass = GST_MULTI_SOCKET_SINK_GET_CLASS (sink);
socket = client->socket;
if (mhclient->currently_removing) {
GST_WARNING_OBJECT (sink, "%s client is already being removed",
mhclient->debug);
@ -714,7 +716,7 @@ gst_multi_socket_sink_remove_client_link (GstMultiHandleSink * sink,
CLIENTS_UNLOCK (sink);
g_signal_emit (G_OBJECT (sink),
gst_multi_socket_sink_signals[SIGNAL_CLIENT_REMOVED], 0, socket,
gst_multi_socket_sink_signals[SIGNAL_CLIENT_REMOVED], 0, client->handle,
mhclient->status);
/* lock again before we remove the client completely */
@ -722,9 +724,9 @@ gst_multi_socket_sink_remove_client_link (GstMultiHandleSink * sink,
/* fd cannot be reused in the above signal callback so we can safely
* remove it from the hashtable here */
if (!g_hash_table_remove (mssink->socket_hash, socket)) {
if (!g_hash_table_remove (mssink->socket_hash, client->handle.socket)) {
GST_WARNING_OBJECT (sink,
"[socket %p] error removing client %p from hash", socket, client);
"%s error removing client %p from hash", mhclient, client);
}
/* after releasing the lock above, the link could be invalid, more
* precisely, the next and prev pointers could point to invalid list
@ -735,15 +737,16 @@ gst_multi_socket_sink_remove_client_link (GstMultiHandleSink * sink,
sink->clients_cookie++;
if (fclass->removed)
fclass->removed (mssink, socket);
fclass->removed (sink, client->handle);
g_free (client);
CLIENTS_UNLOCK (sink);
/* and the fd is really gone now */
g_signal_emit (G_OBJECT (sink),
gst_multi_socket_sink_signals[SIGNAL_CLIENT_SOCKET_REMOVED], 0, socket);
g_object_unref (socket);
gst_multi_socket_sink_signals[SIGNAL_CLIENT_SOCKET_REMOVED], 0,
client->handle);
g_object_unref (client->handle.socket);
CLIENTS_LOCK (sink);
}
@ -762,8 +765,7 @@ gst_multi_socket_sink_handle_client_read (GstMultiSocketSink * sink,
gboolean first = TRUE;
GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
GST_DEBUG_OBJECT (sink, "[socket %p] select reports client read",
client->socket);
GST_DEBUG_OBJECT (sink, "%s select reports client read", mhclient->debug);
ret = TRUE;
@ -773,25 +775,24 @@ gst_multi_socket_sink_handle_client_read (GstMultiSocketSink * sink,
do {
gssize navail;
GST_DEBUG_OBJECT (sink, "[socket %p] client wants us to read",
client->socket);
GST_DEBUG_OBJECT (sink, "%s client wants us to read", mhclient->debug);
navail = g_socket_get_available_bytes (client->socket);
navail = g_socket_get_available_bytes (client->handle.socket);
if (navail < 0)
break;
nread =
g_socket_receive (client->socket, dummy, MIN (navail, sizeof (dummy)),
sink->cancellable, &err);
g_socket_receive (client->handle.socket, dummy, MIN (navail,
sizeof (dummy)), sink->cancellable, &err);
if (first && nread == 0) {
/* client sent close, so remove it */
GST_DEBUG_OBJECT (sink, "[socket %p] client asked for close, removing",
client->socket);
GST_DEBUG_OBJECT (sink, "%s client asked for close, removing",
mhclient->debug);
mhclient->status = GST_CLIENT_STATUS_CLOSED;
ret = FALSE;
} else if (nread < 0) {
GST_WARNING_OBJECT (sink, "[socket %p] could not read: %s",
client->socket, err->message);
GST_WARNING_OBJECT (sink, "%s could not read: %s",
mhclient->debug, err->message);
mhclient->status = GST_CLIENT_STATUS_ERROR;
ret = FALSE;
break;
@ -808,7 +809,6 @@ static gboolean
gst_multi_socket_sink_client_queue_buffer (GstMultiHandleSink * mhsink,
GstMultiHandleClient * mhclient, GstBuffer * buffer)
{
GstSocketClient *client = (GstSocketClient *) mhclient;
GstMultiSocketSink *sink = GST_MULTI_SOCKET_SINK (mhsink);
GstCaps *caps;
@ -822,8 +822,8 @@ gst_multi_socket_sink_client_queue_buffer (GstMultiHandleSink * mhsink,
if (!mhclient->caps) {
GST_DEBUG_OBJECT (sink,
"[socket %p] no previous caps for this client, send streamheader",
client->socket);
"%s no previous caps for this client, send streamheader",
mhclient->debug);
send_streamheader = TRUE;
mhclient->caps = gst_caps_ref (caps);
} else {
@ -836,23 +836,23 @@ gst_multi_socket_sink_client_queue_buffer (GstMultiHandleSink * mhsink,
if (!gst_structure_has_field (s, "streamheader")) {
/* no new streamheader, so nothing new to send */
GST_DEBUG_OBJECT (sink,
"[socket %p] new caps do not have streamheader, not sending",
client->socket);
"%s new caps do not have streamheader, not sending",
mhclient->debug);
} else {
/* there is a new streamheader */
s = gst_caps_get_structure (mhclient->caps, 0);
if (!gst_structure_has_field (s, "streamheader")) {
/* no previous streamheader, so send the new one */
GST_DEBUG_OBJECT (sink,
"[socket %p] previous caps did not have streamheader, sending",
client->socket);
"%s previous caps did not have streamheader, sending",
mhclient->debug);
send_streamheader = TRUE;
} else {
/* both old and new caps have streamheader set */
if (!mhsink->resend_streamheader) {
GST_DEBUG_OBJECT (sink,
"[socket %p] asked to not resend the streamheader, not sending",
client->socket);
"%s asked to not resend the streamheader, not sending",
mhclient->debug);
send_streamheader = FALSE;
} else {
sh1 = gst_structure_get_value (s, "streamheader");
@ -860,8 +860,8 @@ gst_multi_socket_sink_client_queue_buffer (GstMultiHandleSink * mhsink,
sh2 = gst_structure_get_value (s, "streamheader");
if (gst_value_compare (sh1, sh2) != GST_VALUE_EQUAL) {
GST_DEBUG_OBJECT (sink,
"[socket %p] new streamheader different from old, sending",
client->socket);
"%s new streamheader different from old, sending",
mhclient->debug);
send_streamheader = TRUE;
}
}
@ -879,17 +879,16 @@ gst_multi_socket_sink_client_queue_buffer (GstMultiHandleSink * mhsink,
int i;
GST_LOG_OBJECT (sink,
"[socket %p] sending streamheader from caps %" GST_PTR_FORMAT,
client->socket, caps);
"%s sending streamheader from caps %" GST_PTR_FORMAT,
mhclient->debug, caps);
s = gst_caps_get_structure (caps, 0);
if (!gst_structure_has_field (s, "streamheader")) {
GST_DEBUG_OBJECT (sink,
"[socket %p] no new streamheader, so nothing to send",
client->socket);
"%s no new streamheader, so nothing to send", mhclient->debug);
} else {
GST_LOG_OBJECT (sink,
"[socket %p] sending streamheader from caps %" GST_PTR_FORMAT,
client->socket, caps);
"%s sending streamheader from caps %" GST_PTR_FORMAT,
mhclient->debug, caps);
sh = gst_structure_get_value (s, "streamheader");
g_assert (G_VALUE_TYPE (sh) == GST_TYPE_ARRAY);
buffers = g_value_peek_pointer (sh);
@ -902,8 +901,8 @@ gst_multi_socket_sink_client_queue_buffer (GstMultiHandleSink * mhsink,
g_assert (G_VALUE_TYPE (bufval) == GST_TYPE_BUFFER);
buffer = g_value_peek_pointer (bufval);
GST_DEBUG_OBJECT (sink,
"[socket %p] queueing streamheader buffer of length %"
G_GSIZE_FORMAT, client->socket, gst_buffer_get_size (buffer));
"%s queueing streamheader buffer of length %"
G_GSIZE_FORMAT, mhclient->debug, gst_buffer_get_size (buffer));
gst_buffer_ref (buffer);
mhclient->sending = g_slist_append (mhclient->sending, buffer);
@ -915,7 +914,7 @@ gst_multi_socket_sink_client_queue_buffer (GstMultiHandleSink * mhsink,
caps = NULL;
GST_LOG_OBJECT (sink,
"[socket %p] queueing buffer of length %" G_GSIZE_FORMAT, client->socket,
"%s queueing buffer of length %" G_GSIZE_FORMAT, mhclient->debug,
gst_buffer_get_size (buffer));
gst_buffer_ref (buffer);
@ -950,7 +949,6 @@ static gboolean
gst_multi_socket_sink_handle_client_write (GstMultiSocketSink * sink,
GstSocketClient * client)
{
GSocket *socket = client->socket;
gboolean more;
gboolean flushing;
GstClockTime now;
@ -1034,7 +1032,7 @@ gst_multi_socket_sink_handle_client_write (GstMultiSocketSink * sink,
if (mhclient->flushcount != -1)
mhclient->flushcount--;
GST_LOG_OBJECT (sink, "[socket %p] client %p at position %d",
GST_LOG_OBJECT (sink, "%s client %p at position %d",
socket, client, mhclient->bufpos);
/* queueing a buffer will ref it */
@ -1061,8 +1059,9 @@ gst_multi_socket_sink_handle_client_write (GstMultiSocketSink * sink,
/* try to write the complete buffer */
wrote =
g_socket_send (socket, (gchar *) map.data + mhclient->bufoffset,
maxsize, sink->cancellable, &err);
g_socket_send (client->handle.socket,
(gchar *) map.data + mhclient->bufoffset, maxsize, sink->cancellable,
&err);
gst_buffer_unmap (head, &map);
if (wrote < 0) {
@ -1101,14 +1100,13 @@ gst_multi_socket_sink_handle_client_write (GstMultiSocketSink * sink,
/* ERRORS */
flushed:
{
GST_DEBUG_OBJECT (sink, "[socket %p] flushed, removing", socket);
GST_DEBUG_OBJECT (sink, "%s flushed, removing", socket);
mhclient->status = GST_CLIENT_STATUS_REMOVED;
return FALSE;
}
connection_reset:
{
GST_DEBUG_OBJECT (sink, "[socket %p] connection reset by peer, removing",
socket);
GST_DEBUG_OBJECT (sink, "%s connection reset by peer, removing", socket);
mhclient->status = GST_CLIENT_STATUS_CLOSED;
g_clear_error (&err);
return FALSE;
@ -1116,8 +1114,7 @@ connection_reset:
write_error:
{
GST_WARNING_OBJECT (sink,
"[socket %p] could not write, removing client: %s", socket,
err->message);
"%s could not write, removing client: %s", socket, err->message);
g_clear_error (&err);
mhclient->status = GST_CLIENT_STATUS_ERROR;
return FALSE;
@ -1198,8 +1195,8 @@ restart:
next = g_list_next (clients);
mhclient->bufpos++;
GST_LOG_OBJECT (sink, "[socket %p] client %p at position %d",
client->socket, client, mhclient->bufpos);
GST_LOG_OBJECT (sink, "%s client %p at position %d",
mhclient->debug, client, mhclient->bufpos);
/* check soft max if needed, recover client */
if (soft_max_buffers > 0 && mhclient->bufpos >= soft_max_buffers) {
gint newpos;
@ -1209,12 +1206,11 @@ restart:
mhclient->dropped_buffers += mhclient->bufpos - newpos;
mhclient->bufpos = newpos;
mhclient->discont = TRUE;
GST_INFO_OBJECT (sink, "[socket %p] client %p position reset to %d",
client->socket, client, mhclient->bufpos);
GST_INFO_OBJECT (sink, "%s client %p position reset to %d",
mhclient->debug, client, mhclient->bufpos);
} else {
GST_INFO_OBJECT (sink,
"[socket %p] client %p not recovering position",
client->socket, client);
"%s client %p not recovering position", mhclient->debug, client);
}
}
/* check hard max and timeout, remove client */
@ -1222,8 +1218,8 @@ restart:
(mhsink->timeout > 0
&& now - mhclient->last_activity_time > mhsink->timeout)) {
/* remove client */
GST_WARNING_OBJECT (sink, "[socket %p] client %p is too slow, removing",
client->socket, client);
GST_WARNING_OBJECT (sink, "%s client %p is too slow, removing",
mhclient->debug, client);
/* remove the client, the fd set will be cleared and the select thread
* will be signaled */
mhclient->status = GST_CLIENT_STATUS_SLOW;
@ -1236,7 +1232,7 @@ restart:
* the fd_set changed */
if (!client->source) {
client->source =
g_socket_create_source (client->socket,
g_socket_create_source (client->handle.socket,
G_IO_IN | G_IO_OUT | G_IO_PRI | G_IO_ERR | G_IO_HUP,
sink->cancellable);
g_source_set_callback (client->source,
@ -1325,7 +1321,7 @@ restart:
* garbage list and removed.
*/
static gboolean
gst_multi_socket_sink_socket_condition (GSocket * socket,
gst_multi_socket_sink_socket_condition (GstMultiSinkHandle handle,
GIOCondition condition, GstMultiSocketSink * sink)
{
GList *clink;
@ -1337,7 +1333,7 @@ gst_multi_socket_sink_socket_condition (GSocket * socket,
GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
CLIENTS_LOCK (sink);
clink = g_hash_table_lookup (sink->socket_hash, socket);
clink = g_hash_table_lookup (sink->socket_hash, handle.socket);
if (clink == NULL) {
ret = FALSE;
goto done;
@ -1354,7 +1350,7 @@ gst_multi_socket_sink_socket_condition (GSocket * socket,
}
if ((condition & G_IO_ERR)) {
GST_WARNING_OBJECT (sink, "Socket %p has error", client->socket);
GST_WARNING_OBJECT (sink, "Socket %p has error", mhclient->debug);
mhclient->status = GST_CLIENT_STATUS_ERROR;
mhsinkclass->remove_client_link (mhsink, clink);
ret = FALSE;
@ -1500,7 +1496,7 @@ gst_multi_socket_sink_start_pre (GstMultiHandleSink * mhsink)
if (client->source)
continue;
client->source =
g_socket_create_source (client->socket,
g_socket_create_source (client->handle.socket,
G_IO_IN | G_IO_OUT | G_IO_PRI | G_IO_ERR | G_IO_HUP,
mssink->cancellable);
g_source_set_callback (client->source,

View file

@ -55,7 +55,7 @@ typedef struct _GstMultiSocketSinkClass GstMultiSocketSinkClass;
typedef struct {
GstMultiHandleClient client;
GSocket *socket;
GstMultiSinkHandle handle;
GSource *source;
} GstSocketClient;
@ -80,34 +80,34 @@ struct _GstMultiSocketSinkClass {
GstMultiHandleSinkClass parent_class;
/* element methods */
void (*add) (GstMultiSocketSink *sink, GSocket *socket);
void (*add_full) (GstMultiSocketSink *sink, GSocket *socket, GstSyncMethod sync,
void (*add) (GstMultiSocketSink *sink, GstMultiSinkHandle handle);
void (*add_full) (GstMultiSocketSink *sink, GstMultiSinkHandle handle, GstSyncMethod sync,
GstFormat format, guint64 value,
GstFormat max_format, guint64 max_value);
void (*remove) (GstMultiSocketSink *sink, GSocket *socket);
void (*remove_flush) (GstMultiSocketSink *sink, GSocket *socket);
void (*remove) (GstMultiSocketSink *sink, GstMultiSinkHandle handle);
void (*remove_flush) (GstMultiSocketSink *sink, GstMultiSinkHandle handle);
void (*clear) (GstMultiSocketSink *sink);
GstStructure* (*get_stats) (GstMultiSocketSink *sink, GSocket *socket);
GstStructure* (*get_stats) (GstMultiSocketSink *sink, GstMultiSinkHandle handle);
/* vtable */
void (*removed) (GstMultiSocketSink *sink, GSocket *socket);
void (*removed) (GstMultiHandleSink *sink, GstMultiSinkHandle handle);
/* signals */
void (*client_added) (GstElement *element, GSocket *socket);
void (*client_removed) (GstElement *element, GSocket *socket, GstClientStatus status);
void (*client_socket_removed) (GstElement *element, GSocket *socket);
void (*client_added) (GstElement *element, GstMultiSinkHandle handle);
void (*client_removed) (GstElement *element, GstMultiSinkHandle handle, GstClientStatus status);
void (*client_socket_removed) (GstElement *element, GstMultiSinkHandle handle);
};
GType gst_multi_socket_sink_get_type (void);
void gst_multi_socket_sink_add (GstMultiSocketSink *sink, GSocket *socket);
void gst_multi_socket_sink_add_full (GstMultiSocketSink *sink, GSocket *socket, GstSyncMethod sync,
void gst_multi_socket_sink_add (GstMultiSocketSink *sink, GstMultiSinkHandle handle);
void gst_multi_socket_sink_add_full (GstMultiSocketSink *sink, GstMultiSinkHandle handle, GstSyncMethod sync,
GstFormat min_format, guint64 min_value,
GstFormat max_format, guint64 max_value);
void gst_multi_socket_sink_remove (GstMultiSocketSink *sink, GSocket *socket);
void gst_multi_socket_sink_remove_flush (GstMultiSocketSink *sink, GSocket *socket);
void gst_multi_socket_sink_remove (GstMultiSocketSink *sink, GstMultiSinkHandle handle);
void gst_multi_socket_sink_remove_flush (GstMultiSocketSink *sink, GstMultiSinkHandle handle);
void gst_multi_socket_sink_clear (GstMultiHandleSink *sink);
GstStructure* gst_multi_socket_sink_get_stats (GstMultiSocketSink *sink, GSocket *socket);
GstStructure* gst_multi_socket_sink_get_stats (GstMultiSocketSink *sink, GstMultiSinkHandle handle);
G_END_DECLS

View file

@ -59,8 +59,8 @@ static void gst_tcp_server_sink_finalize (GObject * gobject);
static gboolean gst_tcp_server_sink_init_send (GstMultiHandleSink * this);
static gboolean gst_tcp_server_sink_close (GstMultiHandleSink * this);
static void gst_tcp_server_sink_removed (GstMultiSocketSink * sink,
GSocket * socket);
static void gst_tcp_server_sink_removed (GstMultiHandleSink * sink,
GstMultiSinkHandle handle);
static void gst_tcp_server_sink_set_property (GObject * object, guint prop_id,
const GValue * value, GParamSpec * pspec);
@ -147,7 +147,8 @@ gst_tcp_server_sink_handle_server_read (GstTCPServerSink * sink)
if (!client_socket)
goto accept_failed;
gst_multi_socket_sink_add (GST_MULTI_SOCKET_SINK (sink), client_socket);
gst_multi_socket_sink_add (GST_MULTI_SOCKET_SINK (sink),
(GstMultiSinkHandle) client_socket);
#ifndef GST_DISABLE_GST_DEBUG
{
@ -178,7 +179,8 @@ accept_failed:
}
static void
gst_tcp_server_sink_removed (GstMultiSocketSink * sink, GSocket * socket)
gst_tcp_server_sink_removed (GstMultiHandleSink * sink,
GstMultiSinkHandle handle)
{
#ifndef GST_DISABLE_GST_DEBUG
GstTCPServerSink *this = GST_TCP_SERVER_SINK (sink);
@ -187,7 +189,7 @@ gst_tcp_server_sink_removed (GstMultiSocketSink * sink, GSocket * socket)
GST_DEBUG_OBJECT (this, "closing socket");
if (!g_socket_close (socket, &err)) {
if (!g_socket_close (handle.socket, &err)) {
GST_ERROR_OBJECT (this, "Failed to close socket: %s", err->message);
g_clear_error (&err);
}