multiudpsink: re-use send_buffers() code path for render() function

It's like rendering a buffer list, just with one buffer.
Has the added advantage that if there are multiple clients
we can send the buffer to all the clients in one go.
This commit is contained in:
Tim-Philipp Müller 2014-06-24 01:16:37 +01:00
parent 54a9a436ba
commit 97a2eb7afb

View file

@ -912,126 +912,19 @@ static GstFlowReturn
gst_multiudpsink_render (GstBaseSink * bsink, GstBuffer * buffer) gst_multiudpsink_render (GstBaseSink * bsink, GstBuffer * buffer)
{ {
GstMultiUDPSink *sink; GstMultiUDPSink *sink;
GList *clients; GstFlowReturn flow;
GOutputVector *vec; guint8 n_mem;
GstMapInfo *map;
guint n_mem, i;
gsize size;
GstMemory *mem;
gint num, no_clients;
GError *err = NULL;
sink = GST_MULTIUDPSINK_CAST (bsink); sink = GST_MULTIUDPSINK_CAST (bsink);
n_mem = gst_buffer_n_memory (buffer); n_mem = gst_buffer_n_memory (buffer);
if (n_mem == 0)
goto no_data;
/* pre-allocated, the max number of memory blocks is limited so this if (n_mem > 0)
* should not cause overflows */ flow = gst_multiudpsink_render_buffers (sink, &buffer, 1, &n_mem, n_mem);
vec = sink->vec; else
map = sink->map; flow = GST_FLOW_OK;
size = 0; return flow;
for (i = 0; i < n_mem; i++) {
mem = gst_buffer_peek_memory (buffer, i);
gst_memory_map (mem, &map[i], GST_MAP_READ);
vec[i].buffer = map[i].data;
vec[i].size = map[i].size;
size += map[i].size;
}
sink->bytes_to_serve += size;
/* grab lock while iterating and sending to clients, this should be
* fast as UDP never blocks */
g_mutex_lock (&sink->client_lock);
GST_LOG_OBJECT (bsink, "about to send %" G_GSIZE_FORMAT " bytes in %u blocks",
size, n_mem);
no_clients = 0;
num = 0;
for (clients = sink->clients; clients; clients = g_list_next (clients)) {
GstUDPClient *client;
GSocket *socket;
GSocketFamily family;
gint count;
client = (GstUDPClient *) clients->data;
no_clients++;
GST_LOG_OBJECT (sink, "sending %" G_GSIZE_FORMAT " bytes to client %p",
size, client);
family = g_socket_address_get_family (G_SOCKET_ADDRESS (client->addr));
/* Select socket to send from for this address */
if (family == G_SOCKET_FAMILY_IPV6 || !sink->used_socket)
socket = sink->used_socket_v6;
else
socket = sink->used_socket;
count = sink->send_duplicates ? client->add_count : 1;
while (count--) {
gssize ret;
ret =
g_socket_send_message (socket, client->addr, vec, n_mem,
NULL, 0, 0, sink->cancellable, &err);
if (G_UNLIKELY (ret < 0)) {
if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED))
goto flushing;
/* we continue after posting a warning, next packets might be ok
* again */
if (size > UDP_MAX_SIZE) {
GST_ELEMENT_WARNING (sink, RESOURCE, WRITE,
("Attempting to send a UDP packet larger than maximum size "
"(%" G_GSIZE_FORMAT " > %d)", size, UDP_MAX_SIZE),
("Reason: %s", err ? err->message : "unknown reason"));
} else {
GST_ELEMENT_WARNING (sink, RESOURCE, WRITE,
("Error sending UDP packet"), ("Reason: %s",
err ? err->message : "unknown reason"));
}
g_clear_error (&err);
} else {
num++;
client->bytes_sent += ret;
client->packets_sent++;
sink->bytes_served += ret;
}
}
}
g_mutex_unlock (&sink->client_lock);
/* unmap all memory again */
for (i = 0; i < n_mem; i++)
gst_memory_unmap (map[i].memory, &map[i]);
GST_LOG_OBJECT (sink, "sent %" G_GSIZE_FORMAT " bytes to %d (of %d) clients",
size, num, no_clients);
return GST_FLOW_OK;
no_data:
{
return GST_FLOW_OK;
}
flushing:
{
GST_DEBUG ("we are flushing");
g_mutex_unlock (&sink->client_lock);
g_clear_error (&err);
/* unmap all memory */
for (i = 0; i < n_mem; i++)
gst_memory_unmap (map[i].memory, &map[i]);
return GST_FLOW_FLUSHING;
}
} }
static void static void