udpsink: handle scather gather from buffers

Iterate the memory blocks on the buffer and send them using sendmsg.
This commit is contained in:
Wim Taymans 2011-04-05 19:15:11 +02:00
parent 4e7f1633e4
commit 6df597cbbf

View file

@ -488,6 +488,8 @@ socket_last_error_message (void)
#endif #endif
} }
#ifdef G_OS_WIN32
/* version without sendmsg */
static GstFlowReturn static GstFlowReturn
gst_multiudpsink_render (GstBaseSink * bsink, GstBuffer * buffer) gst_multiudpsink_render (GstBaseSink * bsink, GstBuffer * buffer)
{ {
@ -565,106 +567,123 @@ gst_multiudpsink_render (GstBaseSink * bsink, GstBuffer * buffer)
return GST_FLOW_OK; return GST_FLOW_OK;
} }
#else /* !G_OS_WIN32 */
#if 0 /* version with sendmsg */
#ifndef G_OS_WIN32
static GstFlowReturn static GstFlowReturn
gst_multiudpsink_render_list (GstBaseSink * bsink, GstBufferList * list) gst_multiudpsink_render (GstBaseSink * bsink, GstBuffer * buffer)
{ {
GstMultiUDPSink *sink; GstMultiUDPSink *sink;
GList *clients; GList *clients;
gint ret, size = 0, num = 0, no_clients = 0; gint ret, size = 0, num = 0, no_clients = 0;
struct iovec *iov; struct iovec *iov;
struct msghdr msg = { 0 }; struct msghdr msg = { 0 };
GstBufferListIterator *it; guint n_mem, i;
guint gsize; gpointer bdata;
GstBuffer *buf; gsize bsize;
GstMemory *mem;
sink = GST_MULTIUDPSINK (bsink); sink = GST_MULTIUDPSINK (bsink);
g_return_val_if_fail (list != NULL, GST_FLOW_ERROR); msg.msg_iovlen = 0;
size = 0;
it = gst_buffer_list_iterate (list); n_mem = gst_buffer_n_memory (buffer);
g_return_val_if_fail (it != NULL, GST_FLOW_ERROR); if (n_mem == 0)
goto no_data;
while (gst_buffer_list_iterator_next_group (it)) { iov = (struct iovec *) g_malloc (n_mem * sizeof (struct iovec));
msg.msg_iovlen = 0; msg.msg_iov = iov;
size = 0;
if ((gsize = gst_buffer_list_iterator_n_buffers (it)) == 0) { for (i = 0; i < n_mem; i++) {
goto invalid_list; mem = gst_buffer_peek_memory (buffer, i, GST_MAP_READ);
bdata = gst_memory_map (mem, &bsize, NULL, GST_MAP_READ);
if (bsize > UDP_MAX_SIZE) {
GST_WARNING ("Attempting to send a UDP packet larger than maximum "
"size (%d > %d)", bsize, UDP_MAX_SIZE);
} }
iov = (struct iovec *) g_malloc (gsize * sizeof (struct iovec)); msg.msg_iov[msg.msg_iovlen].iov_len = bsize;
msg.msg_iov = iov; msg.msg_iov[msg.msg_iovlen].iov_base = bdata;
msg.msg_iovlen++;
while ((buf = gst_buffer_list_iterator_next (it))) { size += bsize;
if (GST_BUFFER_SIZE (buf) > UDP_MAX_SIZE) { }
GST_WARNING ("Attempting to send a UDP packet larger than maximum "
"size (%d > %d)", GST_BUFFER_SIZE (buf), UDP_MAX_SIZE);
}
msg.msg_iov[msg.msg_iovlen].iov_len = GST_BUFFER_SIZE (buf); sink->bytes_to_serve += size;
msg.msg_iov[msg.msg_iovlen].iov_base = GST_BUFFER_DATA (buf);
msg.msg_iovlen++;
size += GST_BUFFER_SIZE (buf);
}
sink->bytes_to_serve += size; /* grab lock while iterating and sending to clients, this should be
* fast as UDP never blocks */
g_mutex_lock (sink->client_lock);
GST_LOG_OBJECT (bsink, "about to send %d bytes", size);
/* grab lock while iterating and sending to clients, this should be for (clients = sink->clients; clients; clients = g_list_next (clients)) {
* fast as UDP never blocks */ GstUDPClient *client;
g_mutex_lock (sink->client_lock); gint count;
GST_LOG_OBJECT (bsink, "about to send %d bytes", size);
for (clients = sink->clients; clients; clients = g_list_next (clients)) { client = (GstUDPClient *) clients->data;
GstUDPClient *client; no_clients++;
gint count; GST_LOG_OBJECT (sink, "sending %d bytes to client %p", size, client);
client = (GstUDPClient *) clients->data; count = sink->send_duplicates ? client->refcount : 1;
no_clients++;
GST_LOG_OBJECT (sink, "sending %d bytes to client %p", size, client);
count = sink->send_duplicates ? client->refcount : 1; while (count--) {
while (TRUE) {
msg.msg_name = (void *) &client->theiraddr;
msg.msg_namelen = sizeof (client->theiraddr);
ret = sendmsg (*client->sock, &msg, 0);
while (count--) { if (ret < 0) {
while (TRUE) { if (!socket_error_is_ignorable ()) {
msg.msg_name = (void *) &client->theiraddr; gchar *errormessage = socket_last_error_message ();
msg.msg_namelen = sizeof (client->theiraddr); GST_WARNING_OBJECT (sink, "client %p gave error %d (%s)", client,
ret = sendmsg (*client->sock, &msg, 0); socket_last_error_code (), errormessage);
g_free (errormessage);
if (ret < 0) { break;
if (!socket_error_is_ignorable ()) {
break;
}
} else {
num++;
client->bytes_sent += ret;
client->packets_sent++;
sink->bytes_served += ret;
break; break;
} }
} else {
num++;
client->bytes_sent += ret;
client->packets_sent++;
sink->bytes_served += ret;
break;
} }
} }
} }
g_mutex_unlock (sink->client_lock);
g_free (iov);
msg.msg_iov = NULL;
GST_LOG_OBJECT (sink, "sent %d bytes to %d (of %d) clients", size, num,
no_clients);
} }
g_mutex_unlock (sink->client_lock);
gst_buffer_list_iterator_free (it); /* unmap all memory again */
for (i = 0; i < n_mem; i++) {
mem = gst_buffer_peek_memory (buffer, i, GST_MAP_READ);
bsize = msg.msg_iov[i].iov_len;
bdata = msg.msg_iov[i].iov_base;
gst_memory_unmap (mem, bdata, bsize);
}
g_free (iov);
GST_LOG_OBJECT (sink, "sent %d bytes to %d (of %d) clients", size, num,
no_clients);
return GST_FLOW_OK; return GST_FLOW_OK;
invalid_list: no_data:
gst_buffer_list_iterator_free (it); {
return GST_FLOW_ERROR; return GST_FLOW_OK;
}
} }
#endif #endif
#if 0
/* DISABLED, core sends buffers to our render one by one, we can't really do
* much better */
static GstFlowReturn
gst_multiudpsink_render_list (GstBaseSink * bsink, GstBufferList * list)
{
}
#endif #endif
static void static void