From 64d8ec645983d2957df90f9d6beff2cb3ca1ce43 Mon Sep 17 00:00:00 2001 From: Thomas Vander Stichele Date: Fri, 27 Jan 2012 21:28:05 +0100 Subject: [PATCH] multihandlesink: introduce Handle union --- gst/tcp/gstmultifdsink.c | 157 ++++++++++++++-------------- gst/tcp/gstmultifdsink.h | 28 ++--- gst/tcp/gstmultihandlesink.h | 25 +++-- gst/tcp/gstmultisocketsink.c | 196 +++++++++++++++++------------------ gst/tcp/gstmultisocketsink.h | 30 +++--- gst/tcp/gsttcpserversink.c | 12 ++- 6 files changed, 228 insertions(+), 220 deletions(-) diff --git a/gst/tcp/gstmultifdsink.c b/gst/tcp/gstmultifdsink.c index 136b22d649..006c344e6c 100644 --- a/gst/tcp/gstmultifdsink.c +++ b/gst/tcp/gstmultifdsink.c @@ -457,7 +457,7 @@ gst_multi_fd_sink_client_get_fd (GstMultiHandleClient * client) /* "add-full" signal implementation */ void -gst_multi_fd_sink_add_full (GstMultiFdSink * sink, int fd, +gst_multi_fd_sink_add_full (GstMultiFdSink * sink, GstMultiSinkHandle handle, GstSyncMethod sync_method, GstFormat min_format, guint64 min_value, GstFormat max_format, guint64 max_value) { @@ -467,11 +467,13 @@ gst_multi_fd_sink_add_full (GstMultiFdSink * sink, int fd, gint flags; struct stat statbuf; GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink); + // FIXME: convert to a function so we can vfunc this + int fd = handle.fd; - GST_DEBUG_OBJECT (sink, "[fd %5d] adding client, sync_method %d, " + GST_DEBUG_OBJECT (sink, "%s adding client, sync_method %d, " "min_format %d, min_value %" G_GUINT64_FORMAT - ", max_format %d, max_value %" G_GUINT64_FORMAT, fd, sync_method, - min_format, min_value, max_format, max_value); + ", max_format %d, max_value %" G_GUINT64_FORMAT, mhclient->debug, + sync_method, min_format, min_value, max_format, max_value); /* do limits check if we can */ if (min_format == max_format) { @@ -505,8 +507,8 @@ gst_multi_fd_sink_add_full (GstMultiFdSink * sink, int fd, /* set the socket to non blocking */ if (fcntl (fd, F_SETFL, O_NONBLOCK) < 0) { - GST_ERROR_OBJECT (sink, "failed to make socket %d non-blocking: %s", fd, - g_strerror (errno)); + GST_ERROR_OBJECT (sink, "failed to make socket %d non-blocking: %s", + mhclient->debug, g_strerror (errno)); } /* we always read from a client */ @@ -538,18 +540,18 @@ gst_multi_fd_sink_add_full (GstMultiFdSink * sink, int fd, wrong_limits: { GST_WARNING_OBJECT (sink, - "[fd %5d] wrong values min =%" G_GUINT64_FORMAT ", max=%" - G_GUINT64_FORMAT ", unit %d specified when adding client", fd, - min_value, max_value, min_format); + "%s wrong values min =%" G_GUINT64_FORMAT ", max=%" + G_GUINT64_FORMAT ", unit %d specified when adding client", + mhclient->debug, min_value, max_value, min_format); return; } duplicate: { mhclient->status = GST_CLIENT_STATUS_DUPLICATE; CLIENTS_UNLOCK (sink); - GST_WARNING_OBJECT (sink, "[fd %5d] duplicate client found, refusing", fd); + GST_WARNING_OBJECT (sink, "%s duplicate client found, refusing", fd); g_signal_emit (G_OBJECT (sink), - gst_multi_fd_sink_signals[SIGNAL_CLIENT_REMOVED], 0, fd, + gst_multi_fd_sink_signals[SIGNAL_CLIENT_REMOVED], 0, mhclient->debug, mhclient->status); g_free (client); return; @@ -558,26 +560,27 @@ duplicate: /* "add" signal implementation */ void -gst_multi_fd_sink_add (GstMultiFdSink * sink, int fd) +gst_multi_fd_sink_add (GstMultiFdSink * sink, GstMultiSinkHandle handle) { - GstMultiHandleSink *mhsink; + GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink); - mhsink = GST_MULTI_HANDLE_SINK (sink); - gst_multi_fd_sink_add_full (sink, fd, mhsink->def_sync_method, + gst_multi_fd_sink_add_full (sink, handle, mhsink->def_sync_method, mhsink->def_burst_format, mhsink->def_burst_value, mhsink->def_burst_format, -1); } /* "remove" signal implementation */ void -gst_multi_fd_sink_remove (GstMultiFdSink * sink, int fd) +gst_multi_fd_sink_remove (GstMultiFdSink * sink, GstMultiSinkHandle handle) { GList *clink; GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink); GstMultiHandleSinkClass *mhsinkclass = GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink); + // FIXME: convert to a function so we can vfunc this + int fd = handle.fd; - GST_DEBUG_OBJECT (sink, "[fd %5d] removing client", fd); + GST_DEBUG_OBJECT (sink, "%s removing client", fd); CLIENTS_LOCK (sink); clink = g_hash_table_lookup (sink->fd_hash, &fd); @@ -587,7 +590,7 @@ gst_multi_fd_sink_remove (GstMultiFdSink * sink, int fd) if (mhclient->status != GST_CLIENT_STATUS_OK) { GST_INFO_OBJECT (sink, - "[fd %5d] Client already disconnecting with status %d", + "%s Client already disconnecting with status %d", fd, mhclient->status); goto done; } @@ -597,7 +600,7 @@ gst_multi_fd_sink_remove (GstMultiFdSink * sink, int fd) // FIXME: specific poll gst_poll_restart (sink->fdset); } else { - GST_WARNING_OBJECT (sink, "[fd %5d] no client with this fd found!", fd); + GST_WARNING_OBJECT (sink, "%s no client with this fd found!", fd); } done: @@ -606,11 +609,14 @@ done: /* "remove-flush" signal implementation */ void -gst_multi_fd_sink_remove_flush (GstMultiFdSink * sink, int fd) +gst_multi_fd_sink_remove_flush (GstMultiFdSink * sink, + GstMultiSinkHandle handle) { GList *clink; + // FIXME: convert to a function so we can vfunc this + int fd = handle.fd; - GST_DEBUG_OBJECT (sink, "[fd %5d] flushing client", fd); + GST_DEBUG_OBJECT (sink, "%s flushing client", fd); CLIENTS_LOCK (sink); clink = g_hash_table_lookup (sink->fd_hash, &fd); @@ -620,7 +626,7 @@ gst_multi_fd_sink_remove_flush (GstMultiFdSink * sink, int fd) if (mhclient->status != GST_CLIENT_STATUS_OK) { GST_INFO_OBJECT (sink, - "[fd %5d] Client already disconnecting with status %d", + "%s Client already disconnecting with status %d", fd, mhclient->status); goto done; } @@ -633,7 +639,7 @@ gst_multi_fd_sink_remove_flush (GstMultiFdSink * sink, int fd) * it might have some buffers to flush in the ->sending queue. */ mhclient->status = GST_CLIENT_STATUS_FLUSHING; } else { - GST_WARNING_OBJECT (sink, "[fd %5d] no client with this fd found!", fd); + GST_WARNING_OBJECT (sink, "%s no client with this fd found!", fd); } done: CLIENTS_UNLOCK (sink); @@ -661,11 +667,13 @@ gst_multi_fd_sink_clear_post (GstMultiHandleSink * mhsink) * guint64 : timestamp of the last buffer sent (in nanoseconds) */ GValueArray * -gst_multi_fd_sink_get_stats (GstMultiFdSink * sink, int fd) +gst_multi_fd_sink_get_stats (GstMultiFdSink * sink, GstMultiSinkHandle handle) { GstTCPClient *client; GValueArray *result = NULL; GList *clink; + // FIXME: convert to a function so we can vfunc this + int fd = handle.fd; CLIENTS_LOCK (sink); clink = g_hash_table_lookup (sink->fd_hash, &fd); @@ -727,7 +735,7 @@ noclient: /* python doesn't like a NULL pointer yet */ if (result == NULL) { - GST_WARNING_OBJECT (sink, "[fd %5d] no client with this found!", fd); + GST_WARNING_OBJECT (sink, "%s no client with this found!", fd); result = g_value_array_new (0); } @@ -742,16 +750,12 @@ noclient: static void gst_multi_fd_sink_remove_client_link (GstMultiHandleSink * sink, GList * link) { - int fd; GTimeVal now; GstTCPClient *client = (GstTCPClient *) link->data; GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client; GstMultiFdSink *mfsink = GST_MULTI_FD_SINK (sink); - GstMultiFdSinkClass *fclass; - - fclass = GST_MULTI_FD_SINK_GET_CLASS (sink); - - fd = client->fd.fd; + GstMultiFdSinkClass *fclass = GST_MULTI_FD_SINK_GET_CLASS (sink); + int fd = client->fd.fd; if (mhclient->currently_removing) { GST_WARNING_OBJECT (sink, "%s client is already being removed", @@ -822,7 +826,7 @@ gst_multi_fd_sink_remove_client_link (GstMultiHandleSink * sink, GList * link) * remove it from the hashtable here */ if (!g_hash_table_remove (mfsink->fd_hash, &client->fd.fd)) { GST_WARNING_OBJECT (sink, - "[fd %5d] error removing client %p from hash", client->fd.fd, client); + "%s error removing client %p from hash", client->fd.fd, client); } /* after releasing the lock above, the link could be invalid, more * precisely, the next and prev pointers could point to invalid list @@ -833,7 +837,7 @@ gst_multi_fd_sink_remove_client_link (GstMultiHandleSink * sink, GList * link) sink->clients_cookie++; if (fclass->removed) - fclass->removed (mfsink, client->fd.fd); + fclass->removed (mfsink, (GstMultiSinkHandle) client->fd.fd); g_free (client); CLIENTS_UNLOCK (sink); @@ -861,18 +865,18 @@ gst_multi_fd_sink_handle_client_read (GstMultiFdSink * sink, if (ioctl (fd, FIONREAD, &avail) < 0) goto ioctl_failed; - GST_DEBUG_OBJECT (sink, "[fd %5d] select reports client read of %d bytes", + GST_DEBUG_OBJECT (sink, "%s select reports client read of %d bytes", fd, avail); ret = TRUE; if (avail == 0) { /* client sent close, so remove it */ - GST_DEBUG_OBJECT (sink, "[fd %5d] client asked for close, removing", fd); + GST_DEBUG_OBJECT (sink, "%s client asked for close, removing", fd); mhclient->status = GST_CLIENT_STATUS_CLOSED; ret = FALSE; } else if (avail < 0) { - GST_WARNING_OBJECT (sink, "[fd %5d] avail < 0, removing", fd); + GST_WARNING_OBJECT (sink, "%s avail < 0, removing", fd); mhclient->status = GST_CLIENT_STATUS_ERROR; ret = FALSE; } else { @@ -886,18 +890,18 @@ gst_multi_fd_sink_handle_client_read (GstMultiFdSink * sink, /* this is the maximum we can read */ gint to_read = MIN (avail, 512); - GST_DEBUG_OBJECT (sink, "[fd %5d] client wants us to read %d bytes", + GST_DEBUG_OBJECT (sink, "%s client wants us to read %d bytes", fd, to_read); nread = read (fd, dummy, to_read); if (nread < -1) { - GST_WARNING_OBJECT (sink, "[fd %5d] could not read %d bytes: %s (%d)", + GST_WARNING_OBJECT (sink, "%s could not read %d bytes: %s (%d)", fd, to_read, g_strerror (errno), errno); mhclient->status = GST_CLIENT_STATUS_ERROR; ret = FALSE; break; } else if (nread == 0) { - GST_WARNING_OBJECT (sink, "[fd %5d] 0 bytes in read, removing", fd); + GST_WARNING_OBJECT (sink, "%s 0 bytes in read, removing", fd); mhclient->status = GST_CLIENT_STATUS_ERROR; ret = FALSE; break; @@ -911,7 +915,7 @@ gst_multi_fd_sink_handle_client_read (GstMultiFdSink * sink, /* ERRORS */ ioctl_failed: { - GST_WARNING_OBJECT (sink, "[fd %5d] ioctl failed: %s (%d)", + GST_WARNING_OBJECT (sink, "%s ioctl failed: %s (%d)", fd, g_strerror (errno), errno); mhclient->status = GST_CLIENT_STATUS_ERROR; return FALSE; @@ -929,7 +933,6 @@ gst_multi_fd_sink_client_queue_buffer (GstMultiHandleSink * mhsink, gboolean send_streamheader = FALSE; GstStructure *s; GstMultiFdSink *sink = GST_MULTI_FD_SINK (mhsink); - GstTCPClient *client = (GstTCPClient *) mhclient; /* before we queue the buffer, we check if we need to queue streamheader * buffers (because it's a new client, or because they changed) */ @@ -937,8 +940,8 @@ gst_multi_fd_sink_client_queue_buffer (GstMultiHandleSink * mhsink, if (!mhclient->caps) { GST_DEBUG_OBJECT (sink, - "[fd %5d] no previous caps for this client, send streamheader", - client->fd.fd); + "%s no previous caps for this client, send streamheader", + mhclient->debug); send_streamheader = TRUE; mhclient->caps = gst_caps_ref (caps); } else { @@ -951,23 +954,23 @@ gst_multi_fd_sink_client_queue_buffer (GstMultiHandleSink * mhsink, if (!gst_structure_has_field (s, "streamheader")) { /* no new streamheader, so nothing new to send */ GST_DEBUG_OBJECT (sink, - "[fd %5d] new caps do not have streamheader, not sending", - client->fd.fd); + "%s new caps do not have streamheader, not sending", + mhclient->debug); } else { /* there is a new streamheader */ s = gst_caps_get_structure (mhclient->caps, 0); if (!gst_structure_has_field (s, "streamheader")) { /* no previous streamheader, so send the new one */ GST_DEBUG_OBJECT (sink, - "[fd %5d] previous caps did not have streamheader, sending", - client->fd.fd); + "%s previous caps did not have streamheader, sending", + mhclient->debug); send_streamheader = TRUE; } else { /* both old and new caps have streamheader set */ if (!mhsink->resend_streamheader) { GST_DEBUG_OBJECT (sink, - "[fd %5d] asked to not resend the streamheader, not sending", - client->fd.fd); + "%s asked to not resend the streamheader, not sending", + mhclient->debug); send_streamheader = FALSE; } else { sh1 = gst_structure_get_value (s, "streamheader"); @@ -975,8 +978,8 @@ gst_multi_fd_sink_client_queue_buffer (GstMultiHandleSink * mhsink, sh2 = gst_structure_get_value (s, "streamheader"); if (gst_value_compare (sh1, sh2) != GST_VALUE_EQUAL) { GST_DEBUG_OBJECT (sink, - "[fd %5d] new streamheader different from old, sending", - client->fd.fd); + "%s new streamheader different from old, sending", + mhclient->debug); send_streamheader = TRUE; } } @@ -994,16 +997,16 @@ gst_multi_fd_sink_client_queue_buffer (GstMultiHandleSink * mhsink, int i; GST_LOG_OBJECT (sink, - "[fd %5d] sending streamheader from caps %" GST_PTR_FORMAT, - client->fd.fd, caps); + "%s sending streamheader from caps %" GST_PTR_FORMAT, + mhclient->debug, caps); s = gst_caps_get_structure (caps, 0); if (!gst_structure_has_field (s, "streamheader")) { GST_DEBUG_OBJECT (sink, - "[fd %5d] no new streamheader, so nothing to send", client->fd.fd); + "%s no new streamheader, so nothing to send", mhclient->debug); } else { GST_LOG_OBJECT (sink, - "[fd %5d] sending streamheader from caps %" GST_PTR_FORMAT, - client->fd.fd, caps); + "%s sending streamheader from caps %" GST_PTR_FORMAT, + mhclient->debug, caps); sh = gst_structure_get_value (s, "streamheader"); g_assert (G_VALUE_TYPE (sh) == GST_TYPE_ARRAY); buffers = g_value_peek_pointer (sh); @@ -1016,8 +1019,8 @@ gst_multi_fd_sink_client_queue_buffer (GstMultiHandleSink * mhsink, g_assert (G_VALUE_TYPE (bufval) == GST_TYPE_BUFFER); buffer = g_value_peek_pointer (bufval); GST_DEBUG_OBJECT (sink, - "[fd %5d] queueing streamheader buffer of length %" G_GSIZE_FORMAT, - client->fd.fd, gst_buffer_get_size (buffer)); + "%s queueing streamheader buffer of length %" G_GSIZE_FORMAT, + mhclient->debug, gst_buffer_get_size (buffer)); gst_buffer_ref (buffer); mhclient->sending = g_slist_append (mhclient->sending, buffer); @@ -1028,8 +1031,8 @@ gst_multi_fd_sink_client_queue_buffer (GstMultiHandleSink * mhsink, gst_caps_unref (caps); caps = NULL; - GST_LOG_OBJECT (sink, "[fd %5d] queueing buffer of length %" G_GSIZE_FORMAT, - client->fd.fd, gst_buffer_get_size (buffer)); + GST_LOG_OBJECT (sink, "%s queueing buffer of length %" G_GSIZE_FORMAT, + mhclient->debug, gst_buffer_get_size (buffer)); gst_buffer_ref (buffer); mhclient->sending = g_slist_append (mhclient->sending, buffer); @@ -1063,7 +1066,7 @@ static gboolean gst_multi_fd_sink_handle_client_write (GstMultiFdSink * sink, GstTCPClient * client) { - int fd = client->fd.fd; + GstMultiSinkHandle handle = (GstMultiSinkHandle) client->fd.fd; gboolean more; gboolean flushing; GstClockTime now; @@ -1072,7 +1075,7 @@ gst_multi_fd_sink_handle_client_write (GstMultiFdSink * sink, GstMultiHandleSinkClass *mhsinkclass = GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink); GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client; - + int fd = handle.fd; g_get_current_time (&nowtv); now = GST_TIMEVAL_TO_TIME (nowtv); @@ -1137,8 +1140,8 @@ gst_multi_fd_sink_handle_client_write (GstMultiFdSink * sink, if (mhclient->flushcount != -1) mhclient->flushcount--; - GST_LOG_OBJECT (sink, "[fd %5d] client %p at position %d", - fd, client, mhclient->bufpos); + GST_LOG_OBJECT (sink, "%s client %p at position %d", + mhclient->debug, client, mhclient->bufpos); /* queueing a buffer will ref it */ mhsinkclass->client_queue_buffer (mhsink, mhclient, buf); @@ -1191,7 +1194,8 @@ gst_multi_fd_sink_handle_client_write (GstMultiFdSink * sink, /* partial write means that the client cannot read more and we should * stop sending more */ GST_LOG_OBJECT (sink, - "partial write on %d of %" G_GSSIZE_FORMAT " bytes", fd, wrote); + "partial write on %d of %" G_GSSIZE_FORMAT " bytes", + mhclient->debug, wrote); mhclient->bufoffset += wrote; more = FALSE; } else { @@ -1214,20 +1218,20 @@ gst_multi_fd_sink_handle_client_write (GstMultiFdSink * sink, /* ERRORS */ flushed: { - GST_DEBUG_OBJECT (sink, "[fd %5d] flushed, removing", fd); + GST_DEBUG_OBJECT (sink, "%s flushed, removing", fd); mhclient->status = GST_CLIENT_STATUS_REMOVED; return FALSE; } connection_reset: { - GST_DEBUG_OBJECT (sink, "[fd %5d] connection reset by peer, removing", fd); + GST_DEBUG_OBJECT (sink, "%s connection reset by peer, removing", fd); mhclient->status = GST_CLIENT_STATUS_CLOSED; return FALSE; } write_error: { GST_WARNING_OBJECT (sink, - "[fd %5d] could not write, removing client: %s (%d)", fd, + "%s could not write, removing client: %s (%d)", mhclient->debug, g_strerror (errno), errno); mhclient->status = GST_CLIENT_STATUS_ERROR; return FALSE; @@ -1307,8 +1311,8 @@ restart: next = g_list_next (clients); mhclient->bufpos++; - GST_LOG_OBJECT (sink, "[fd %5d] client %p at position %d", - client->fd.fd, client, mhclient->bufpos); + GST_LOG_OBJECT (sink, "%s client %p at position %d", + mhclient->debug, client, mhclient->bufpos); /* check soft max if needed, recover client */ if (soft_max_buffers > 0 && mhclient->bufpos >= soft_max_buffers) { gint newpos; @@ -1318,12 +1322,11 @@ restart: mhclient->dropped_buffers += mhclient->bufpos - newpos; mhclient->bufpos = newpos; mhclient->discont = TRUE; - GST_INFO_OBJECT (sink, "[fd %5d] client %p position reset to %d", - client->fd.fd, client, mhclient->bufpos); + GST_INFO_OBJECT (sink, "%s client %p position reset to %d", + mhclient->debug, client, mhclient->bufpos); } else { GST_INFO_OBJECT (sink, - "[fd %5d] client %p not recovering position", - client->fd.fd, client); + "%s client %p not recovering position", mhclient->debug, client); } } /* check hard max and timeout, remove client */ @@ -1331,8 +1334,8 @@ restart: (mhsink->timeout > 0 && now - mhclient->last_activity_time > mhsink->timeout)) { /* remove client */ - GST_WARNING_OBJECT (sink, "[fd %5d] client %p is too slow, removing", - client->fd.fd, client); + GST_WARNING_OBJECT (sink, "%s client %p is too slow, removing", + mhclient->debug, client); /* remove the client, the fd set will be cleared and the select thread * will be signaled */ mhclient->status = GST_CLIENT_STATUS_SLOW; @@ -1446,6 +1449,7 @@ gst_multi_fd_sink_handle_clients (GstMultiFdSink * sink) GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink); GstMultiHandleSinkClass *mhsinkclass = GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink); + int fd; fclass = GST_MULTI_FD_SINK_GET_CLASS (sink); @@ -1500,7 +1504,6 @@ gst_multi_fd_sink_handle_clients (GstMultiFdSink * sink) for (clients = mhsink->clients; clients; clients = next) { GstTCPClient *client; GstMultiHandleClient *mhclient; - int fd; long flags; int res; diff --git a/gst/tcp/gstmultifdsink.h b/gst/tcp/gstmultifdsink.h index d589539b72..5f6a5d9514 100644 --- a/gst/tcp/gstmultifdsink.h +++ b/gst/tcp/gstmultifdsink.h @@ -80,35 +80,35 @@ struct _GstMultiFdSinkClass { GstMultiHandleSinkClass parent_class; /* element methods */ - void (*add) (GstMultiFdSink *sink, int fd); - void (*add_full) (GstMultiFdSink *sink, int fd, GstSyncMethod sync, + void (*add) (GstMultiFdSink *sink, GstMultiSinkHandle handle); + void (*add_full) (GstMultiFdSink *sink, GstMultiSinkHandle handle, GstSyncMethod sync, GstFormat format, guint64 value, GstFormat max_format, guint64 max_value); - void (*remove) (GstMultiFdSink *sink, int fd); - void (*remove_flush) (GstMultiFdSink *sink, int fd); + void (*remove) (GstMultiFdSink *sink, GstMultiSinkHandle handle); + void (*remove_flush) (GstMultiFdSink *sink, GstMultiSinkHandle handle); void (*clear) (GstMultiFdSink *sink); - GValueArray* (*get_stats) (GstMultiFdSink *sink, int fd); + GValueArray* (*get_stats) (GstMultiFdSink *sink, GstMultiSinkHandle handle); /* vtable */ gboolean (*wait) (GstMultiFdSink *sink, GstPoll *set); - void (*removed) (GstMultiFdSink *sink, int fd); + void (*removed) (GstMultiFdSink *sink, GstMultiSinkHandle handle); /* signals */ - void (*client_added) (GstElement *element, gint fd); - void (*client_removed) (GstElement *element, gint fd, GstClientStatus status); - void (*client_fd_removed) (GstElement *element, gint fd); + void (*client_added) (GstElement *element, GstMultiSinkHandle handle); + void (*client_removed) (GstElement *element, GstMultiSinkHandle handle, GstClientStatus status); + void (*client_fd_removed) (GstElement *element, GstMultiSinkHandle handle); }; GType gst_multi_fd_sink_get_type (void); -void gst_multi_fd_sink_add (GstMultiFdSink *sink, int fd); -void gst_multi_fd_sink_add_full (GstMultiFdSink *sink, int fd, GstSyncMethod sync, +void gst_multi_fd_sink_add (GstMultiFdSink *sink, GstMultiSinkHandle handle); +void gst_multi_fd_sink_add_full (GstMultiFdSink *sink, GstMultiSinkHandle handle, GstSyncMethod sync, GstFormat min_format, guint64 min_value, GstFormat max_format, guint64 max_value); -void gst_multi_fd_sink_remove (GstMultiFdSink *sink, int fd); -void gst_multi_fd_sink_remove_flush (GstMultiFdSink *sink, int fd); +void gst_multi_fd_sink_remove (GstMultiFdSink *sink, GstMultiSinkHandle handle); +void gst_multi_fd_sink_remove_flush (GstMultiFdSink *sink, GstMultiSinkHandle handle); void gst_multi_fd_sink_clear (GstMultiHandleSink *sink); -GValueArray* gst_multi_fd_sink_get_stats (GstMultiFdSink *sink, int fd); +GValueArray* gst_multi_fd_sink_get_stats (GstMultiFdSink *sink, GstMultiSinkHandle handle); G_END_DECLS diff --git a/gst/tcp/gstmultihandlesink.h b/gst/tcp/gstmultihandlesink.h index ba4e23bad2..bf273bac53 100644 --- a/gst/tcp/gstmultihandlesink.h +++ b/gst/tcp/gstmultihandlesink.h @@ -120,6 +120,13 @@ typedef enum GST_CLIENT_STATUS_FLUSHING = 6 } GstClientStatus; +// FIXME: is it better to use GSocket * or a gpointer here ? +typedef union +{ + int fd; + GSocket *socket; +} GstMultiSinkHandle; + /* structure for a client */ typedef struct { @@ -251,12 +258,12 @@ struct _GstMultiHandleSinkClass { GstBaseSinkClass parent_class; /* element methods */ - void (*add) (GstMultiHandleSink *sink, GSocket *socket); - void (*add_full) (GstMultiHandleSink *sink, GSocket *socket, GstSyncMethod sync, + void (*add) (GstMultiHandleSink *sink, GstMultiSinkHandle handle); + void (*add_full) (GstMultiHandleSink *sink, GstMultiSinkHandle handle, GstSyncMethod sync, GstFormat format, guint64 value, GstFormat max_format, guint64 max_value); - void (*remove) (GstMultiHandleSink *sink, GSocket *socket); - void (*remove_flush) (GstMultiHandleSink *sink, GSocket *socket); + void (*remove) (GstMultiHandleSink *sink, GstMultiSinkHandle handle); + void (*remove_flush) (GstMultiHandleSink *sink, GstMultiSinkHandle handle); void (*clear) (GstMultiHandleSink *sink); void (*clear_post) (GstMultiHandleSink *sink); void (*stop_pre) (GstMultiHandleSink *sink); @@ -273,18 +280,18 @@ struct _GstMultiHandleSinkClass { (GstMultiHandleClient *client); - GstStructure* (*get_stats) (GstMultiHandleSink *sink, GSocket *socket); + GstStructure* (*get_stats) (GstMultiHandleSink *sink, GstMultiSinkHandle handle); void (*remove_client_link) (GstMultiHandleSink * sink, GList * link); /* vtable */ gboolean (*init) (GstMultiHandleSink *sink); gboolean (*close) (GstMultiHandleSink *sink); - void (*removed) (GstMultiHandleSink *sink, GSocket *socket); + void (*removed) (GstMultiHandleSink *sink, GstMultiSinkHandle handle); /* signals */ - void (*client_added) (GstElement *element, GSocket *socket); - void (*client_removed) (GstElement *element, GSocket *socket, GstClientStatus status); - void (*client_socket_removed) (GstElement *element, GSocket *socket); + void (*client_added) (GstElement *element, GstMultiSinkHandle handle); + void (*client_removed) (GstElement *element, GstMultiSinkHandle handle, GstClientStatus status); + void (*client_socket_removed) (GstElement *element, GstMultiSinkHandle handle); }; GType gst_multi_handle_sink_get_type (void); diff --git a/gst/tcp/gstmultisocketsink.c b/gst/tcp/gstmultisocketsink.c index 831ffdf9ea..d3f640ca0b 100644 --- a/gst/tcp/gstmultisocketsink.c +++ b/gst/tcp/gstmultisocketsink.c @@ -160,8 +160,8 @@ static int gst_multi_socket_sink_client_get_fd (GstMultiHandleClient * client); static void gst_multi_socket_sink_remove_client_link (GstMultiHandleSink * sink, GList * link); -static gboolean gst_multi_socket_sink_socket_condition (GSocket * socket, - GIOCondition condition, GstMultiSocketSink * sink); +static gboolean gst_multi_socket_sink_socket_condition (GstMultiSinkHandle + handle, GIOCondition condition, GstMultiSocketSink * sink); static gboolean gst_multi_socket_sink_unlock (GstBaseSink * bsink); static gboolean gst_multi_socket_sink_unlock_stop (GstBaseSink * bsink); @@ -399,23 +399,25 @@ gst_multi_socket_sink_client_get_fd (GstMultiHandleClient * client) { GstSocketClient *msclient = (GstSocketClient *) client; - return g_socket_get_fd (msclient->socket); + return g_socket_get_fd (msclient->handle.socket); } /* "add-full" signal implementation */ void -gst_multi_socket_sink_add_full (GstMultiSocketSink * sink, GSocket * socket, - GstSyncMethod sync_method, GstFormat min_format, guint64 min_value, - GstFormat max_format, guint64 max_value) +gst_multi_socket_sink_add_full (GstMultiSocketSink * sink, + GstMultiSinkHandle handle, GstSyncMethod sync_method, GstFormat min_format, + guint64 min_value, GstFormat max_format, guint64 max_value) { GstSocketClient *client; GstMultiHandleClient *mhclient; GList *clink; GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink); - GST_DEBUG_OBJECT (sink, "[socket %p] adding client, sync_method %d, " + g_assert (G_IS_SOCKET (handle.socket)); + + GST_DEBUG_OBJECT (sink, "%s adding client, sync_method %d, " "min_format %d, min_value %" G_GUINT64_FORMAT - ", max_format %d, max_value %" G_GUINT64_FORMAT, socket, + ", max_format %d, max_value %" G_GUINT64_FORMAT, handle.socket, sync_method, min_format, min_value, max_format, max_value); /* do limits check if we can */ @@ -428,8 +430,8 @@ gst_multi_socket_sink_add_full (GstMultiSocketSink * sink, GSocket * socket, client = g_new0 (GstSocketClient, 1); mhclient = (GstMultiHandleClient *) client; gst_multi_handle_sink_client_init (mhclient, sync_method); - g_snprintf (mhclient->debug, 30, "[socket %p]", socket); - client->socket = G_SOCKET (g_object_ref (socket)); + g_snprintf (mhclient->debug, 30, "[socket %p]", handle.socket); + client->handle.socket = G_SOCKET (g_object_ref (handle.socket)); mhclient->burst_min_format = min_format; mhclient->burst_min_value = min_value; @@ -439,22 +441,22 @@ gst_multi_socket_sink_add_full (GstMultiSocketSink * sink, GSocket * socket, CLIENTS_LOCK (sink); /* check the hash to find a duplicate fd */ - clink = g_hash_table_lookup (sink->socket_hash, socket); + clink = g_hash_table_lookup (sink->socket_hash, handle.socket); if (clink != NULL) goto duplicate; /* we can add the fd now */ clink = mhsink->clients = g_list_prepend (mhsink->clients, client); - g_hash_table_insert (sink->socket_hash, socket, clink); + g_hash_table_insert (sink->socket_hash, handle.socket, clink); mhsink->clients_cookie++; /* set the socket to non blocking */ - g_socket_set_blocking (socket, FALSE); + g_socket_set_blocking (handle.socket, FALSE); /* we always read from a client */ if (sink->main_context) { client->source = - g_socket_create_source (client->socket, + g_socket_create_source (client->handle.socket, G_IO_IN | G_IO_OUT | G_IO_PRI | G_IO_ERR | G_IO_HUP, sink->cancellable); g_source_set_callback (client->source, (GSourceFunc) gst_multi_socket_sink_socket_condition, @@ -467,7 +469,7 @@ gst_multi_socket_sink_add_full (GstMultiSocketSink * sink, GSocket * socket, CLIENTS_UNLOCK (sink); g_signal_emit (G_OBJECT (sink), - gst_multi_socket_sink_signals[SIGNAL_CLIENT_ADDED], 0, socket); + gst_multi_socket_sink_signals[SIGNAL_CLIENT_ADDED], 0, handle); return; @@ -475,19 +477,19 @@ gst_multi_socket_sink_add_full (GstMultiSocketSink * sink, GSocket * socket, wrong_limits: { GST_WARNING_OBJECT (sink, - "[socket %p] wrong values min =%" G_GUINT64_FORMAT ", max=%" - G_GUINT64_FORMAT ", format %d specified when adding client", socket, - min_value, max_value, min_format); + "%s wrong values min =%" G_GUINT64_FORMAT ", max=%" + G_GUINT64_FORMAT ", format %d specified when adding client", + mhclient->debug, min_value, max_value, min_format); return; } duplicate: { mhclient->status = GST_CLIENT_STATUS_DUPLICATE; CLIENTS_UNLOCK (sink); - GST_WARNING_OBJECT (sink, "[socket %p] duplicate client found, refusing", - socket); + GST_WARNING_OBJECT (sink, "%s duplicate client found, refusing", + mhclient->debug); g_signal_emit (G_OBJECT (sink), - gst_multi_socket_sink_signals[SIGNAL_CLIENT_REMOVED], 0, socket, + gst_multi_socket_sink_signals[SIGNAL_CLIENT_REMOVED], 0, handle, mhclient->status); g_free (client); return; @@ -496,37 +498,39 @@ duplicate: /* "add" signal implementation */ void -gst_multi_socket_sink_add (GstMultiSocketSink * sink, GSocket * socket) +gst_multi_socket_sink_add (GstMultiSocketSink * sink, GstMultiSinkHandle handle) { GstMultiHandleSink *mhsink; mhsink = GST_MULTI_HANDLE_SINK (sink); - gst_multi_socket_sink_add_full (sink, socket, mhsink->def_sync_method, + gst_multi_socket_sink_add_full (sink, handle, mhsink->def_sync_method, mhsink->def_burst_format, mhsink->def_burst_value, mhsink->def_burst_format, -1); } /* "remove" signal implementation */ void -gst_multi_socket_sink_remove (GstMultiSocketSink * sink, GSocket * socket) +gst_multi_socket_sink_remove (GstMultiSocketSink * sink, + GstMultiSinkHandle handle) { GList *clink; GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink); GstMultiHandleSinkClass *mhsinkclass = GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink); - GST_DEBUG_OBJECT (sink, "[socket %p] removing client", socket); + // FIXME; how to vfunc this ? + GST_DEBUG_OBJECT (sink, "[socket %p] removing client", handle.socket); CLIENTS_LOCK (sink); - clink = g_hash_table_lookup (sink->socket_hash, socket); + clink = g_hash_table_lookup (sink->socket_hash, handle.socket); if (clink != NULL) { GstSocketClient *client = clink->data; GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client; if (mhclient->status != GST_CLIENT_STATUS_OK) { GST_INFO_OBJECT (sink, - "[socket %p] Client already disconnecting with status %d", - socket, mhclient->status); + "%s Client already disconnecting with status %d", + mhclient->debug, mhclient->status); goto done; } @@ -534,7 +538,7 @@ gst_multi_socket_sink_remove (GstMultiSocketSink * sink, GSocket * socket) mhsinkclass->remove_client_link (GST_MULTI_HANDLE_SINK (sink), clink); } else { GST_WARNING_OBJECT (sink, "[socket %p] no client with this socket found!", - socket); + handle.socket); } done: @@ -543,21 +547,22 @@ done: /* "remove-flush" signal implementation */ void -gst_multi_socket_sink_remove_flush (GstMultiSocketSink * sink, GSocket * socket) +gst_multi_socket_sink_remove_flush (GstMultiSocketSink * sink, + GstMultiSinkHandle handle) { GList *clink; - GST_DEBUG_OBJECT (sink, "[socket %p] flushing client", socket); + GST_DEBUG_OBJECT (sink, "[socket %p] flushing client", handle.socket); CLIENTS_LOCK (sink); - clink = g_hash_table_lookup (sink->socket_hash, socket); + clink = g_hash_table_lookup (sink->socket_hash, handle.socket); if (clink != NULL) { GstSocketClient *client = clink->data; GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client; if (mhclient->status != GST_CLIENT_STATUS_OK) { GST_INFO_OBJECT (sink, - "[socket %p] Client already disconnecting with status %d", + "%s Client already disconnecting with status %d", socket, mhclient->status); goto done; } @@ -570,8 +575,7 @@ gst_multi_socket_sink_remove_flush (GstMultiSocketSink * sink, GSocket * socket) * it might have some buffers to flush in the ->sending queue. */ mhclient->status = GST_CLIENT_STATUS_FLUSHING; } else { - GST_WARNING_OBJECT (sink, "[socket %p] no client with this fd found!", - socket); + GST_WARNING_OBJECT (sink, "%s no client with this fd found!", socket); } done: CLIENTS_UNLOCK (sink); @@ -580,14 +584,15 @@ done: /* "get-stats" signal implementation */ GstStructure * -gst_multi_socket_sink_get_stats (GstMultiSocketSink * sink, GSocket * socket) +gst_multi_socket_sink_get_stats (GstMultiSocketSink * sink, + GstMultiSinkHandle handle) { GstSocketClient *client; GstStructure *result = NULL; GList *clink; CLIENTS_LOCK (sink); - clink = g_hash_table_lookup (sink->socket_hash, socket); + clink = g_hash_table_lookup (sink->socket_hash, handle.socket); if (clink == NULL) goto noclient; @@ -624,7 +629,7 @@ noclient: /* python doesn't like a NULL pointer yet */ if (result == NULL) { - GST_WARNING_OBJECT (sink, "[socket %p] no client with this found!", socket); + GST_WARNING_OBJECT (sink, "%s no client with this found!", socket); result = gst_structure_new_empty ("multisocketsink-stats"); } @@ -640,7 +645,6 @@ static void gst_multi_socket_sink_remove_client_link (GstMultiHandleSink * sink, GList * link) { - GSocket *socket; GTimeVal now; GstSocketClient *client = link->data; GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client; @@ -649,8 +653,6 @@ gst_multi_socket_sink_remove_client_link (GstMultiHandleSink * sink, fclass = GST_MULTI_SOCKET_SINK_GET_CLASS (sink); - socket = client->socket; - if (mhclient->currently_removing) { GST_WARNING_OBJECT (sink, "%s client is already being removed", mhclient->debug); @@ -714,7 +716,7 @@ gst_multi_socket_sink_remove_client_link (GstMultiHandleSink * sink, CLIENTS_UNLOCK (sink); g_signal_emit (G_OBJECT (sink), - gst_multi_socket_sink_signals[SIGNAL_CLIENT_REMOVED], 0, socket, + gst_multi_socket_sink_signals[SIGNAL_CLIENT_REMOVED], 0, client->handle, mhclient->status); /* lock again before we remove the client completely */ @@ -722,9 +724,9 @@ gst_multi_socket_sink_remove_client_link (GstMultiHandleSink * sink, /* fd cannot be reused in the above signal callback so we can safely * remove it from the hashtable here */ - if (!g_hash_table_remove (mssink->socket_hash, socket)) { + if (!g_hash_table_remove (mssink->socket_hash, client->handle.socket)) { GST_WARNING_OBJECT (sink, - "[socket %p] error removing client %p from hash", socket, client); + "%s error removing client %p from hash", mhclient, client); } /* after releasing the lock above, the link could be invalid, more * precisely, the next and prev pointers could point to invalid list @@ -735,15 +737,16 @@ gst_multi_socket_sink_remove_client_link (GstMultiHandleSink * sink, sink->clients_cookie++; if (fclass->removed) - fclass->removed (mssink, socket); + fclass->removed (sink, client->handle); g_free (client); CLIENTS_UNLOCK (sink); /* and the fd is really gone now */ g_signal_emit (G_OBJECT (sink), - gst_multi_socket_sink_signals[SIGNAL_CLIENT_SOCKET_REMOVED], 0, socket); - g_object_unref (socket); + gst_multi_socket_sink_signals[SIGNAL_CLIENT_SOCKET_REMOVED], 0, + client->handle); + g_object_unref (client->handle.socket); CLIENTS_LOCK (sink); } @@ -762,8 +765,7 @@ gst_multi_socket_sink_handle_client_read (GstMultiSocketSink * sink, gboolean first = TRUE; GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client; - GST_DEBUG_OBJECT (sink, "[socket %p] select reports client read", - client->socket); + GST_DEBUG_OBJECT (sink, "%s select reports client read", mhclient->debug); ret = TRUE; @@ -773,25 +775,24 @@ gst_multi_socket_sink_handle_client_read (GstMultiSocketSink * sink, do { gssize navail; - GST_DEBUG_OBJECT (sink, "[socket %p] client wants us to read", - client->socket); + GST_DEBUG_OBJECT (sink, "%s client wants us to read", mhclient->debug); - navail = g_socket_get_available_bytes (client->socket); + navail = g_socket_get_available_bytes (client->handle.socket); if (navail < 0) break; nread = - g_socket_receive (client->socket, dummy, MIN (navail, sizeof (dummy)), - sink->cancellable, &err); + g_socket_receive (client->handle.socket, dummy, MIN (navail, + sizeof (dummy)), sink->cancellable, &err); if (first && nread == 0) { /* client sent close, so remove it */ - GST_DEBUG_OBJECT (sink, "[socket %p] client asked for close, removing", - client->socket); + GST_DEBUG_OBJECT (sink, "%s client asked for close, removing", + mhclient->debug); mhclient->status = GST_CLIENT_STATUS_CLOSED; ret = FALSE; } else if (nread < 0) { - GST_WARNING_OBJECT (sink, "[socket %p] could not read: %s", - client->socket, err->message); + GST_WARNING_OBJECT (sink, "%s could not read: %s", + mhclient->debug, err->message); mhclient->status = GST_CLIENT_STATUS_ERROR; ret = FALSE; break; @@ -808,7 +809,6 @@ static gboolean gst_multi_socket_sink_client_queue_buffer (GstMultiHandleSink * mhsink, GstMultiHandleClient * mhclient, GstBuffer * buffer) { - GstSocketClient *client = (GstSocketClient *) mhclient; GstMultiSocketSink *sink = GST_MULTI_SOCKET_SINK (mhsink); GstCaps *caps; @@ -822,8 +822,8 @@ gst_multi_socket_sink_client_queue_buffer (GstMultiHandleSink * mhsink, if (!mhclient->caps) { GST_DEBUG_OBJECT (sink, - "[socket %p] no previous caps for this client, send streamheader", - client->socket); + "%s no previous caps for this client, send streamheader", + mhclient->debug); send_streamheader = TRUE; mhclient->caps = gst_caps_ref (caps); } else { @@ -836,23 +836,23 @@ gst_multi_socket_sink_client_queue_buffer (GstMultiHandleSink * mhsink, if (!gst_structure_has_field (s, "streamheader")) { /* no new streamheader, so nothing new to send */ GST_DEBUG_OBJECT (sink, - "[socket %p] new caps do not have streamheader, not sending", - client->socket); + "%s new caps do not have streamheader, not sending", + mhclient->debug); } else { /* there is a new streamheader */ s = gst_caps_get_structure (mhclient->caps, 0); if (!gst_structure_has_field (s, "streamheader")) { /* no previous streamheader, so send the new one */ GST_DEBUG_OBJECT (sink, - "[socket %p] previous caps did not have streamheader, sending", - client->socket); + "%s previous caps did not have streamheader, sending", + mhclient->debug); send_streamheader = TRUE; } else { /* both old and new caps have streamheader set */ if (!mhsink->resend_streamheader) { GST_DEBUG_OBJECT (sink, - "[socket %p] asked to not resend the streamheader, not sending", - client->socket); + "%s asked to not resend the streamheader, not sending", + mhclient->debug); send_streamheader = FALSE; } else { sh1 = gst_structure_get_value (s, "streamheader"); @@ -860,8 +860,8 @@ gst_multi_socket_sink_client_queue_buffer (GstMultiHandleSink * mhsink, sh2 = gst_structure_get_value (s, "streamheader"); if (gst_value_compare (sh1, sh2) != GST_VALUE_EQUAL) { GST_DEBUG_OBJECT (sink, - "[socket %p] new streamheader different from old, sending", - client->socket); + "%s new streamheader different from old, sending", + mhclient->debug); send_streamheader = TRUE; } } @@ -879,17 +879,16 @@ gst_multi_socket_sink_client_queue_buffer (GstMultiHandleSink * mhsink, int i; GST_LOG_OBJECT (sink, - "[socket %p] sending streamheader from caps %" GST_PTR_FORMAT, - client->socket, caps); + "%s sending streamheader from caps %" GST_PTR_FORMAT, + mhclient->debug, caps); s = gst_caps_get_structure (caps, 0); if (!gst_structure_has_field (s, "streamheader")) { GST_DEBUG_OBJECT (sink, - "[socket %p] no new streamheader, so nothing to send", - client->socket); + "%s no new streamheader, so nothing to send", mhclient->debug); } else { GST_LOG_OBJECT (sink, - "[socket %p] sending streamheader from caps %" GST_PTR_FORMAT, - client->socket, caps); + "%s sending streamheader from caps %" GST_PTR_FORMAT, + mhclient->debug, caps); sh = gst_structure_get_value (s, "streamheader"); g_assert (G_VALUE_TYPE (sh) == GST_TYPE_ARRAY); buffers = g_value_peek_pointer (sh); @@ -902,8 +901,8 @@ gst_multi_socket_sink_client_queue_buffer (GstMultiHandleSink * mhsink, g_assert (G_VALUE_TYPE (bufval) == GST_TYPE_BUFFER); buffer = g_value_peek_pointer (bufval); GST_DEBUG_OBJECT (sink, - "[socket %p] queueing streamheader buffer of length %" - G_GSIZE_FORMAT, client->socket, gst_buffer_get_size (buffer)); + "%s queueing streamheader buffer of length %" + G_GSIZE_FORMAT, mhclient->debug, gst_buffer_get_size (buffer)); gst_buffer_ref (buffer); mhclient->sending = g_slist_append (mhclient->sending, buffer); @@ -915,7 +914,7 @@ gst_multi_socket_sink_client_queue_buffer (GstMultiHandleSink * mhsink, caps = NULL; GST_LOG_OBJECT (sink, - "[socket %p] queueing buffer of length %" G_GSIZE_FORMAT, client->socket, + "%s queueing buffer of length %" G_GSIZE_FORMAT, mhclient->debug, gst_buffer_get_size (buffer)); gst_buffer_ref (buffer); @@ -950,7 +949,6 @@ static gboolean gst_multi_socket_sink_handle_client_write (GstMultiSocketSink * sink, GstSocketClient * client) { - GSocket *socket = client->socket; gboolean more; gboolean flushing; GstClockTime now; @@ -1034,7 +1032,7 @@ gst_multi_socket_sink_handle_client_write (GstMultiSocketSink * sink, if (mhclient->flushcount != -1) mhclient->flushcount--; - GST_LOG_OBJECT (sink, "[socket %p] client %p at position %d", + GST_LOG_OBJECT (sink, "%s client %p at position %d", socket, client, mhclient->bufpos); /* queueing a buffer will ref it */ @@ -1061,8 +1059,9 @@ gst_multi_socket_sink_handle_client_write (GstMultiSocketSink * sink, /* try to write the complete buffer */ wrote = - g_socket_send (socket, (gchar *) map.data + mhclient->bufoffset, - maxsize, sink->cancellable, &err); + g_socket_send (client->handle.socket, + (gchar *) map.data + mhclient->bufoffset, maxsize, sink->cancellable, + &err); gst_buffer_unmap (head, &map); if (wrote < 0) { @@ -1101,14 +1100,13 @@ gst_multi_socket_sink_handle_client_write (GstMultiSocketSink * sink, /* ERRORS */ flushed: { - GST_DEBUG_OBJECT (sink, "[socket %p] flushed, removing", socket); + GST_DEBUG_OBJECT (sink, "%s flushed, removing", socket); mhclient->status = GST_CLIENT_STATUS_REMOVED; return FALSE; } connection_reset: { - GST_DEBUG_OBJECT (sink, "[socket %p] connection reset by peer, removing", - socket); + GST_DEBUG_OBJECT (sink, "%s connection reset by peer, removing", socket); mhclient->status = GST_CLIENT_STATUS_CLOSED; g_clear_error (&err); return FALSE; @@ -1116,8 +1114,7 @@ connection_reset: write_error: { GST_WARNING_OBJECT (sink, - "[socket %p] could not write, removing client: %s", socket, - err->message); + "%s could not write, removing client: %s", socket, err->message); g_clear_error (&err); mhclient->status = GST_CLIENT_STATUS_ERROR; return FALSE; @@ -1198,8 +1195,8 @@ restart: next = g_list_next (clients); mhclient->bufpos++; - GST_LOG_OBJECT (sink, "[socket %p] client %p at position %d", - client->socket, client, mhclient->bufpos); + GST_LOG_OBJECT (sink, "%s client %p at position %d", + mhclient->debug, client, mhclient->bufpos); /* check soft max if needed, recover client */ if (soft_max_buffers > 0 && mhclient->bufpos >= soft_max_buffers) { gint newpos; @@ -1209,12 +1206,11 @@ restart: mhclient->dropped_buffers += mhclient->bufpos - newpos; mhclient->bufpos = newpos; mhclient->discont = TRUE; - GST_INFO_OBJECT (sink, "[socket %p] client %p position reset to %d", - client->socket, client, mhclient->bufpos); + GST_INFO_OBJECT (sink, "%s client %p position reset to %d", + mhclient->debug, client, mhclient->bufpos); } else { GST_INFO_OBJECT (sink, - "[socket %p] client %p not recovering position", - client->socket, client); + "%s client %p not recovering position", mhclient->debug, client); } } /* check hard max and timeout, remove client */ @@ -1222,8 +1218,8 @@ restart: (mhsink->timeout > 0 && now - mhclient->last_activity_time > mhsink->timeout)) { /* remove client */ - GST_WARNING_OBJECT (sink, "[socket %p] client %p is too slow, removing", - client->socket, client); + GST_WARNING_OBJECT (sink, "%s client %p is too slow, removing", + mhclient->debug, client); /* remove the client, the fd set will be cleared and the select thread * will be signaled */ mhclient->status = GST_CLIENT_STATUS_SLOW; @@ -1236,7 +1232,7 @@ restart: * the fd_set changed */ if (!client->source) { client->source = - g_socket_create_source (client->socket, + g_socket_create_source (client->handle.socket, G_IO_IN | G_IO_OUT | G_IO_PRI | G_IO_ERR | G_IO_HUP, sink->cancellable); g_source_set_callback (client->source, @@ -1325,7 +1321,7 @@ restart: * garbage list and removed. */ static gboolean -gst_multi_socket_sink_socket_condition (GSocket * socket, +gst_multi_socket_sink_socket_condition (GstMultiSinkHandle handle, GIOCondition condition, GstMultiSocketSink * sink) { GList *clink; @@ -1337,7 +1333,7 @@ gst_multi_socket_sink_socket_condition (GSocket * socket, GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink); CLIENTS_LOCK (sink); - clink = g_hash_table_lookup (sink->socket_hash, socket); + clink = g_hash_table_lookup (sink->socket_hash, handle.socket); if (clink == NULL) { ret = FALSE; goto done; @@ -1354,7 +1350,7 @@ gst_multi_socket_sink_socket_condition (GSocket * socket, } if ((condition & G_IO_ERR)) { - GST_WARNING_OBJECT (sink, "Socket %p has error", client->socket); + GST_WARNING_OBJECT (sink, "Socket %p has error", mhclient->debug); mhclient->status = GST_CLIENT_STATUS_ERROR; mhsinkclass->remove_client_link (mhsink, clink); ret = FALSE; @@ -1500,7 +1496,7 @@ gst_multi_socket_sink_start_pre (GstMultiHandleSink * mhsink) if (client->source) continue; client->source = - g_socket_create_source (client->socket, + g_socket_create_source (client->handle.socket, G_IO_IN | G_IO_OUT | G_IO_PRI | G_IO_ERR | G_IO_HUP, mssink->cancellable); g_source_set_callback (client->source, diff --git a/gst/tcp/gstmultisocketsink.h b/gst/tcp/gstmultisocketsink.h index 1a0222b10e..2da288f50d 100644 --- a/gst/tcp/gstmultisocketsink.h +++ b/gst/tcp/gstmultisocketsink.h @@ -55,7 +55,7 @@ typedef struct _GstMultiSocketSinkClass GstMultiSocketSinkClass; typedef struct { GstMultiHandleClient client; - GSocket *socket; + GstMultiSinkHandle handle; GSource *source; } GstSocketClient; @@ -80,34 +80,34 @@ struct _GstMultiSocketSinkClass { GstMultiHandleSinkClass parent_class; /* element methods */ - void (*add) (GstMultiSocketSink *sink, GSocket *socket); - void (*add_full) (GstMultiSocketSink *sink, GSocket *socket, GstSyncMethod sync, + void (*add) (GstMultiSocketSink *sink, GstMultiSinkHandle handle); + void (*add_full) (GstMultiSocketSink *sink, GstMultiSinkHandle handle, GstSyncMethod sync, GstFormat format, guint64 value, GstFormat max_format, guint64 max_value); - void (*remove) (GstMultiSocketSink *sink, GSocket *socket); - void (*remove_flush) (GstMultiSocketSink *sink, GSocket *socket); + void (*remove) (GstMultiSocketSink *sink, GstMultiSinkHandle handle); + void (*remove_flush) (GstMultiSocketSink *sink, GstMultiSinkHandle handle); void (*clear) (GstMultiSocketSink *sink); - GstStructure* (*get_stats) (GstMultiSocketSink *sink, GSocket *socket); + GstStructure* (*get_stats) (GstMultiSocketSink *sink, GstMultiSinkHandle handle); /* vtable */ - void (*removed) (GstMultiSocketSink *sink, GSocket *socket); + void (*removed) (GstMultiHandleSink *sink, GstMultiSinkHandle handle); /* signals */ - void (*client_added) (GstElement *element, GSocket *socket); - void (*client_removed) (GstElement *element, GSocket *socket, GstClientStatus status); - void (*client_socket_removed) (GstElement *element, GSocket *socket); + void (*client_added) (GstElement *element, GstMultiSinkHandle handle); + void (*client_removed) (GstElement *element, GstMultiSinkHandle handle, GstClientStatus status); + void (*client_socket_removed) (GstElement *element, GstMultiSinkHandle handle); }; GType gst_multi_socket_sink_get_type (void); -void gst_multi_socket_sink_add (GstMultiSocketSink *sink, GSocket *socket); -void gst_multi_socket_sink_add_full (GstMultiSocketSink *sink, GSocket *socket, GstSyncMethod sync, +void gst_multi_socket_sink_add (GstMultiSocketSink *sink, GstMultiSinkHandle handle); +void gst_multi_socket_sink_add_full (GstMultiSocketSink *sink, GstMultiSinkHandle handle, GstSyncMethod sync, GstFormat min_format, guint64 min_value, GstFormat max_format, guint64 max_value); -void gst_multi_socket_sink_remove (GstMultiSocketSink *sink, GSocket *socket); -void gst_multi_socket_sink_remove_flush (GstMultiSocketSink *sink, GSocket *socket); +void gst_multi_socket_sink_remove (GstMultiSocketSink *sink, GstMultiSinkHandle handle); +void gst_multi_socket_sink_remove_flush (GstMultiSocketSink *sink, GstMultiSinkHandle handle); void gst_multi_socket_sink_clear (GstMultiHandleSink *sink); -GstStructure* gst_multi_socket_sink_get_stats (GstMultiSocketSink *sink, GSocket *socket); +GstStructure* gst_multi_socket_sink_get_stats (GstMultiSocketSink *sink, GstMultiSinkHandle handle); G_END_DECLS diff --git a/gst/tcp/gsttcpserversink.c b/gst/tcp/gsttcpserversink.c index 62acaaf87c..451de742aa 100644 --- a/gst/tcp/gsttcpserversink.c +++ b/gst/tcp/gsttcpserversink.c @@ -59,8 +59,8 @@ static void gst_tcp_server_sink_finalize (GObject * gobject); static gboolean gst_tcp_server_sink_init_send (GstMultiHandleSink * this); static gboolean gst_tcp_server_sink_close (GstMultiHandleSink * this); -static void gst_tcp_server_sink_removed (GstMultiSocketSink * sink, - GSocket * socket); +static void gst_tcp_server_sink_removed (GstMultiHandleSink * sink, + GstMultiSinkHandle handle); static void gst_tcp_server_sink_set_property (GObject * object, guint prop_id, const GValue * value, GParamSpec * pspec); @@ -147,7 +147,8 @@ gst_tcp_server_sink_handle_server_read (GstTCPServerSink * sink) if (!client_socket) goto accept_failed; - gst_multi_socket_sink_add (GST_MULTI_SOCKET_SINK (sink), client_socket); + gst_multi_socket_sink_add (GST_MULTI_SOCKET_SINK (sink), + (GstMultiSinkHandle) client_socket); #ifndef GST_DISABLE_GST_DEBUG { @@ -178,7 +179,8 @@ accept_failed: } static void -gst_tcp_server_sink_removed (GstMultiSocketSink * sink, GSocket * socket) +gst_tcp_server_sink_removed (GstMultiHandleSink * sink, + GstMultiSinkHandle handle) { #ifndef GST_DISABLE_GST_DEBUG GstTCPServerSink *this = GST_TCP_SERVER_SINK (sink); @@ -187,7 +189,7 @@ gst_tcp_server_sink_removed (GstMultiSocketSink * sink, GSocket * socket) GST_DEBUG_OBJECT (this, "closing socket"); - if (!g_socket_close (socket, &err)) { + if (!g_socket_close (handle.socket, &err)) { GST_ERROR_OBJECT (this, "Failed to close socket: %s", err->message); g_clear_error (&err); }