From 6df597cbbfb4c223a893506b4bf32a7c3386bdcf Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Tue, 5 Apr 2011 19:15:11 +0200 Subject: [PATCH] udpsink: handle scather gather from buffers Iterate the memory blocks on the buffer and send them using sendmsg. --- gst/udp/gstmultiudpsink.c | 149 +++++++++++++++++++++----------------- 1 file changed, 84 insertions(+), 65 deletions(-) diff --git a/gst/udp/gstmultiudpsink.c b/gst/udp/gstmultiudpsink.c index 19972d79d5..2a677fc290 100644 --- a/gst/udp/gstmultiudpsink.c +++ b/gst/udp/gstmultiudpsink.c @@ -488,6 +488,8 @@ socket_last_error_message (void) #endif } +#ifdef G_OS_WIN32 +/* version without sendmsg */ static GstFlowReturn gst_multiudpsink_render (GstBaseSink * bsink, GstBuffer * buffer) { @@ -565,106 +567,123 @@ gst_multiudpsink_render (GstBaseSink * bsink, GstBuffer * buffer) return GST_FLOW_OK; } - -#if 0 -#ifndef G_OS_WIN32 +#else /* !G_OS_WIN32 */ +/* version with sendmsg */ static GstFlowReturn -gst_multiudpsink_render_list (GstBaseSink * bsink, GstBufferList * list) +gst_multiudpsink_render (GstBaseSink * bsink, GstBuffer * buffer) { GstMultiUDPSink *sink; GList *clients; gint ret, size = 0, num = 0, no_clients = 0; struct iovec *iov; struct msghdr msg = { 0 }; - GstBufferListIterator *it; - guint gsize; - GstBuffer *buf; + guint n_mem, i; + gpointer bdata; + gsize bsize; + GstMemory *mem; sink = GST_MULTIUDPSINK (bsink); - g_return_val_if_fail (list != NULL, GST_FLOW_ERROR); + msg.msg_iovlen = 0; + size = 0; - it = gst_buffer_list_iterate (list); - g_return_val_if_fail (it != NULL, GST_FLOW_ERROR); + n_mem = gst_buffer_n_memory (buffer); + if (n_mem == 0) + goto no_data; - while (gst_buffer_list_iterator_next_group (it)) { - msg.msg_iovlen = 0; - size = 0; + iov = (struct iovec *) g_malloc (n_mem * sizeof (struct iovec)); + msg.msg_iov = iov; - if ((gsize = gst_buffer_list_iterator_n_buffers (it)) == 0) { - goto invalid_list; + for (i = 0; i < n_mem; i++) { + mem = gst_buffer_peek_memory (buffer, i, GST_MAP_READ); + bdata = gst_memory_map (mem, &bsize, NULL, GST_MAP_READ); + + if (bsize > UDP_MAX_SIZE) { + GST_WARNING ("Attempting to send a UDP packet larger than maximum " + "size (%d > %d)", bsize, UDP_MAX_SIZE); } - iov = (struct iovec *) g_malloc (gsize * sizeof (struct iovec)); - msg.msg_iov = iov; + msg.msg_iov[msg.msg_iovlen].iov_len = bsize; + msg.msg_iov[msg.msg_iovlen].iov_base = bdata; + msg.msg_iovlen++; - while ((buf = gst_buffer_list_iterator_next (it))) { - if (GST_BUFFER_SIZE (buf) > UDP_MAX_SIZE) { - GST_WARNING ("Attempting to send a UDP packet larger than maximum " - "size (%d > %d)", GST_BUFFER_SIZE (buf), UDP_MAX_SIZE); - } + size += bsize; + } - msg.msg_iov[msg.msg_iovlen].iov_len = GST_BUFFER_SIZE (buf); - msg.msg_iov[msg.msg_iovlen].iov_base = GST_BUFFER_DATA (buf); - msg.msg_iovlen++; - size += GST_BUFFER_SIZE (buf); - } + sink->bytes_to_serve += size; - sink->bytes_to_serve += size; + /* grab lock while iterating and sending to clients, this should be + * fast as UDP never blocks */ + g_mutex_lock (sink->client_lock); + GST_LOG_OBJECT (bsink, "about to send %d bytes", size); - /* grab lock while iterating and sending to clients, this should be - * fast as UDP never blocks */ - g_mutex_lock (sink->client_lock); - GST_LOG_OBJECT (bsink, "about to send %d bytes", size); + for (clients = sink->clients; clients; clients = g_list_next (clients)) { + GstUDPClient *client; + gint count; - for (clients = sink->clients; clients; clients = g_list_next (clients)) { - GstUDPClient *client; - gint count; + client = (GstUDPClient *) clients->data; + no_clients++; + GST_LOG_OBJECT (sink, "sending %d bytes to client %p", size, client); - client = (GstUDPClient *) clients->data; - no_clients++; - GST_LOG_OBJECT (sink, "sending %d bytes to client %p", size, client); + count = sink->send_duplicates ? client->refcount : 1; - count = sink->send_duplicates ? client->refcount : 1; + while (count--) { + while (TRUE) { + msg.msg_name = (void *) &client->theiraddr; + msg.msg_namelen = sizeof (client->theiraddr); + ret = sendmsg (*client->sock, &msg, 0); - while (count--) { - while (TRUE) { - msg.msg_name = (void *) &client->theiraddr; - msg.msg_namelen = sizeof (client->theiraddr); - ret = sendmsg (*client->sock, &msg, 0); - - if (ret < 0) { - if (!socket_error_is_ignorable ()) { - break; - } - } else { - num++; - client->bytes_sent += ret; - client->packets_sent++; - sink->bytes_served += ret; + if (ret < 0) { + if (!socket_error_is_ignorable ()) { + gchar *errormessage = socket_last_error_message (); + GST_WARNING_OBJECT (sink, "client %p gave error %d (%s)", client, + socket_last_error_code (), errormessage); + g_free (errormessage); + break; break; } + } else { + num++; + client->bytes_sent += ret; + client->packets_sent++; + sink->bytes_served += ret; + break; } } } - g_mutex_unlock (sink->client_lock); - - g_free (iov); - msg.msg_iov = NULL; - - GST_LOG_OBJECT (sink, "sent %d bytes to %d (of %d) clients", size, num, - no_clients); } + g_mutex_unlock (sink->client_lock); - gst_buffer_list_iterator_free (it); + /* unmap all memory again */ + for (i = 0; i < n_mem; i++) { + mem = gst_buffer_peek_memory (buffer, i, GST_MAP_READ); + + bsize = msg.msg_iov[i].iov_len; + bdata = msg.msg_iov[i].iov_base; + + gst_memory_unmap (mem, bdata, bsize); + } + g_free (iov); + + GST_LOG_OBJECT (sink, "sent %d bytes to %d (of %d) clients", size, num, + no_clients); return GST_FLOW_OK; -invalid_list: - gst_buffer_list_iterator_free (it); - return GST_FLOW_ERROR; +no_data: + { + return GST_FLOW_OK; + } } #endif + +#if 0 +/* DISABLED, core sends buffers to our render one by one, we can't really do + * much better */ +static GstFlowReturn +gst_multiudpsink_render_list (GstBaseSink * bsink, GstBufferList * list) +{ +} #endif static void