multiudpsink: add sendmmsg-ready render_list function prototype

Add prototype for a render_list() function that can use a
sendmmsg-style g_socket_send_messages() function once it lands
in GLib. We can use this infrastructure to send multiple buffers
made up by multiple memories to multiple clients in one go, which
drastically reduces the number of syscalls made when sending
high-bitrate video streams.

https://bugzilla.gnome.org/show_bug.cgi?id=732152
This commit is contained in:
Tim-Philipp Müller 2014-06-20 11:36:19 +01:00
parent dead5c2476
commit e1a7deb27f
2 changed files with 399 additions and 0 deletions

View file

@ -120,6 +120,8 @@ static void gst_multiudpsink_finalize (GObject * object);
static GstFlowReturn gst_multiudpsink_render (GstBaseSink * sink,
GstBuffer * buffer);
static GstFlowReturn gst_multiudpsink_render_list (GstBaseSink * bsink,
GstBufferList * buffer_list);
static gboolean gst_multiudpsink_start (GstBaseSink * bsink);
static gboolean gst_multiudpsink_stop (GstBaseSink * bsink);
@ -359,6 +361,7 @@ gst_multiudpsink_class_init (GstMultiUDPSinkClass * klass)
"Wim Taymans <wim.taymans@gmail.com>");
gstbasesink_class->render = gst_multiudpsink_render;
gstbasesink_class->render_list = gst_multiudpsink_render_list;
gstbasesink_class->start = gst_multiudpsink_start;
gstbasesink_class->stop = gst_multiudpsink_stop;
gstbasesink_class->unlock = gst_multiudpsink_unlock;
@ -408,6 +411,9 @@ gst_multiudpsink_init (GstMultiUDPSink * sink)
sink->vec = g_new (GOutputVector, max_mem);
sink->map = g_new (GstMapInfo, max_mem);
/* we assume that the number of memories per buffer can fit into a guint8 */
g_warn_if_fail (max_mem <= G_MAXUINT8);
}
static GstUDPClient *
@ -533,6 +539,375 @@ gst_multiudpsink_finalize (GObject * object)
G_OBJECT_CLASS (parent_class)->finalize (object);
}
/* replacement until we can depend unconditionally on the real one in GLib */
#ifndef HAVE_G_SOCKET_SEND_MESSAGES
#define g_socket_send_messages gst_socket_send_messages
static gint
gst_socket_send_messages (GSocket * socket, GstOutputMessage * messages,
guint num_messages, gint flags, GCancellable * cancellable, GError ** error)
{
gssize result;
gint i;
for (i = 0; i < num_messages; ++i) {
GstOutputMessage *msg = &messages[i];
GError *msg_error = NULL;
result = g_socket_send_message (socket, msg->address,
msg->vectors, msg->num_vectors,
msg->control_messages, msg->num_control_messages,
flags, cancellable, &msg_error);
if (result < 0) {
/* if we couldn't send all messages, just return how many we did
* manage to send, provided we managed to send at least one */
if (msg_error->code == G_IO_ERROR_WOULD_BLOCK && i > 0) {
g_error_free (msg_error);
return i;
} else {
g_propagate_error (error, msg_error);
return -1;
}
}
msg->bytes_sent = result;
}
return i;
}
#endif /* HAVE_G_SOCKET_SEND_MESSAGES */
static gsize
fill_vectors (GOutputVector * vecs, GstMapInfo * maps, guint n, GstBuffer * buf)
{
GstMemory *mem;
gsize size = 0;
guint i;
g_assert (gst_buffer_n_memory (buf) == n);
for (i = 0; i < n; ++i) {
mem = gst_buffer_peek_memory (buf, i);
if (gst_memory_map (mem, &maps[i], GST_MAP_READ)) {
vecs[i].buffer = maps[i].data;
vecs[i].size = maps[i].size;
} else {
GST_WARNING ("Failed to map memory %p for reading", mem);
vecs[i].buffer = "";
vecs[i].size = 0;
}
size += vecs[i].size;
}
return size;
}
static gsize
gst_udp_calc_message_size (GstOutputMessage * msg)
{
gsize size = 0;
guint i;
for (i = 0; i < msg->num_vectors; ++i)
size += msg->vectors[i].size;
return size;
}
static gint
gst_udp_messsages_find_first_not_sent (GstOutputMessage * messages,
guint num_messages)
{
guint i;
for (i = 0; i < num_messages; ++i) {
GstOutputMessage *msg = &messages[i];
if (msg->bytes_sent == 0 && gst_udp_calc_message_size (msg) > 0)
return i;
}
return -1;
}
static inline gchar *
gst_udp_address_get_string (GSocketAddress * addr, gchar * s, gsize size)
{
GInetSocketAddress *isa = G_INET_SOCKET_ADDRESS (addr);
GInetAddress *ia;
gchar *addr_str;
ia = g_inet_socket_address_get_address (isa);
addr_str = g_inet_address_to_string (ia);
g_snprintf (s, size, "%s:%u", addr_str, g_inet_socket_address_get_port (isa));
g_free (addr_str);
g_object_unref (ia);
return s;
}
/* Wrapper around g_socket_send_messages() plus error handling (ignoring).
* Returns FALSE if we got cancelled, otherwise TRUE. */
static gboolean
gst_multiudpsink_send_messages (GstMultiUDPSink * sink, GSocket * socket,
GstOutputMessage * messages, guint num_messages)
{
gboolean sent_max_size_warning = FALSE;
while (num_messages > 0) {
gchar astr[64] G_GNUC_UNUSED;
GError *err = NULL;
guint msg_size, skip, i;
gint ret, err_idx;
ret = g_socket_send_messages (socket, messages, num_messages, 0,
sink->cancellable, &err);
if (G_UNLIKELY (ret < 0)) {
GstOutputMessage *msg;
if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
g_clear_error (&err);
return FALSE;
}
err_idx = gst_udp_messsages_find_first_not_sent (messages, num_messages);
if (err_idx < 0)
break;
msg = &messages[err_idx];
msg_size = gst_udp_calc_message_size (msg);
GST_LOG_OBJECT (sink, "error sending %u bytes to client %s: %s", msg_size,
gst_udp_address_get_string (msg->address, astr, sizeof (astr)),
err->message);
skip = 1;
if (msg_size > UDP_MAX_SIZE) {
if (!sent_max_size_warning) {
GST_ELEMENT_WARNING (sink, RESOURCE, WRITE,
("Attempting to send a UDP packets larger than maximum size "
"(%u > %d)", msg_size, UDP_MAX_SIZE),
("Reason: %s", err ? err->message : "unknown reason"));
sent_max_size_warning = FALSE;
}
} else {
GST_ELEMENT_WARNING (sink, RESOURCE, WRITE,
("Error sending UDP packets"), ("client %s, reason: %s",
gst_udp_address_get_string (msg->address, astr, sizeof (astr)),
(err != NULL) ? err->message : "unknown reason"));
for (i = err_idx + 1; i < num_messages; ++i, ++skip) {
if (messages[i].address != msg->address)
break;
}
GST_DEBUG_OBJECT (sink, "skipping %d message(s) to same client", skip);
}
/* ignore any errors and try sending the rest */
g_clear_error (&err);
ret = skip;
}
g_assert (ret <= num_messages);
messages += ret;
num_messages -= ret;
}
return TRUE;
}
static GstFlowReturn
gst_multiudpsink_render_buffers (GstMultiUDPSink * sink, GstBuffer ** buffers,
guint num_buffers, guint8 * mem_nums, guint total_mem_num)
{
GstOutputMessage *msgs;
gboolean send_duplicates;
GstUDPClient **clients;
GOutputVector *vecs;
GstMapInfo *map_infos;
GstFlowReturn flow_ret;
guint num_addr_v4, num_addr_v6;
guint num_addr, num_msgs;
GError *err = NULL;
guint i, j, mem;
gsize size = 0;
GList *l;
send_duplicates = sink->send_duplicates;
g_mutex_lock (&sink->client_lock);
if (send_duplicates) {
num_addr_v4 = sink->num_v4_all;
num_addr_v6 = sink->num_v6_all;
} else {
num_addr_v4 = sink->num_v4_unique;
num_addr_v6 = sink->num_v6_unique;
}
num_addr = num_addr_v4 + num_addr_v6;
if (num_addr == 0)
goto no_clients;
clients = g_newa (GstUDPClient *, num_addr);
for (l = sink->clients, i = 0; l != NULL; l = l->next) {
GstUDPClient *client = l->data;
clients[i++] = gst_udp_client_ref (client);
for (j = 1; send_duplicates && j < client->add_count; ++j)
clients[i++] = gst_udp_client_ref (client);
}
g_assert_cmpuint (i, ==, num_addr);
g_mutex_unlock (&sink->client_lock);
GST_LOG_OBJECT (sink, "%u buffers, %u memories -> to be sent to %u clients",
num_buffers, total_mem_num, num_addr);
vecs = g_newa (GOutputVector, total_mem_num);
map_infos = g_newa (GstMapInfo, total_mem_num);
num_msgs = num_addr * num_buffers;
msgs = g_newa (GstOutputMessage, num_msgs);
/* populate first num_buffers messages with output vectors for the buffers */
for (i = 0, mem = 0; i < num_buffers; ++i) {
size += fill_vectors (&vecs[mem], &map_infos[mem], mem_nums[i], buffers[i]);
msgs[i].vectors = &vecs[mem];
msgs[i].num_vectors = mem_nums[i];
msgs[i].num_control_messages = 0;
msgs[i].control_messages = NULL;
msgs[i].address = clients[0]->addr;
mem += mem_nums[i];
}
/* FIXME: how about some locking? (there wasn't any before either, but..) */
sink->bytes_to_serve += size;
/* now copy the pre-filled num_buffer messages over to the next num_buffer
* messages for the next client, where we also change the target adddress */
for (i = 1; i < num_addr; ++i) {
for (j = 0; j < num_buffers; ++j) {
msgs[i * num_buffers + j] = msgs[j];
msgs[i * num_buffers + j].address = clients[i]->addr;
}
}
/* now send it! */
{
gboolean ret;
/* no IPv4 socket? Send it all from the IPv6 socket then.. */
if (sink->used_socket == NULL) {
ret = gst_multiudpsink_send_messages (sink, sink->used_socket_v6,
msgs, num_msgs);
} else {
guint num_msgs_v4 = num_buffers * num_addr_v4;
guint num_msgs_v6 = num_buffers * num_addr_v6;
/* FIXME: assumes clients are sorted in our list! */
ret = gst_multiudpsink_send_messages (sink, sink->used_socket,
msgs, num_msgs_v4);
if (!ret)
goto cancelled;
ret = gst_multiudpsink_send_messages (sink, sink->used_socket_v6,
msgs + num_msgs_v4, num_msgs_v6);
}
if (!ret)
goto cancelled;
}
flow_ret = GST_FLOW_OK;
/* now update stats */
g_mutex_lock (&sink->client_lock);
for (i = 0; i < num_addr; ++i) {
GstUDPClient *client = clients[i];
for (j = 0; j < num_buffers; ++j) {
gsize bytes_sent;
bytes_sent = msgs[i * num_buffers + j].bytes_sent;
client->bytes_sent += bytes_sent;
client->packets_sent++;
sink->bytes_served += bytes_sent;
}
gst_udp_client_unref (client);
}
g_mutex_unlock (&sink->client_lock);
out:
for (i = 0; i < mem; ++i)
gst_memory_unmap (map_infos[i].memory, &map_infos[i]);
return flow_ret;;
no_clients:
{
g_mutex_unlock (&sink->client_lock);
GST_LOG_OBJECT (sink, "no clients");
return GST_FLOW_OK;
}
cancelled:
{
GST_INFO_OBJECT (sink, "cancelled");
g_clear_error (&err);
flow_ret = GST_FLOW_FLUSHING;
g_mutex_lock (&sink->client_lock);
for (i = 0; i < num_addr; ++i)
gst_udp_client_unref (clients[i]);
g_mutex_unlock (&sink->client_lock);
goto out;
}
}
static GstFlowReturn
gst_multiudpsink_render_list (GstBaseSink * bsink, GstBufferList * buffer_list)
{
GstMultiUDPSink *sink;
GstBuffer **buffers;
GstFlowReturn flow;
guint8 *mem_nums;
guint total_mems;
guint i, num_buffers;
sink = GST_MULTIUDPSINK_CAST (bsink);
num_buffers = gst_buffer_list_length (buffer_list);
if (num_buffers == 0)
goto no_data;
buffers = g_newa (GstBuffer *, num_buffers);
mem_nums = g_newa (guint8, num_buffers);
for (i = 0, total_mems = 0; i < num_buffers; ++i) {
buffers[i] = gst_buffer_list_get (buffer_list, i);
mem_nums[i] = gst_buffer_n_memory (buffers[i]);
total_mems += mem_nums[i];
}
flow = gst_multiudpsink_render_buffers (sink, buffers, num_buffers,
mem_nums, total_mems);
return flow;
no_data:
{
GST_LOG_OBJECT (sink, "empty buffer");
return GST_FLOW_OK;
}
}
static GstFlowReturn
gst_multiudpsink_render (GstBaseSink * bsink, GstBuffer * buffer)
{

View file

@ -38,6 +38,28 @@ G_BEGIN_DECLS
typedef struct _GstMultiUDPSink GstMultiUDPSink;
typedef struct _GstMultiUDPSinkClass GstMultiUDPSinkClass;
#if GLIB_CHECK_VERSION (2, 43, 2)
#define HAVE_G_SOCKET_SEND_MESSAGES
#endif
#ifndef HAVE_G_SOCKET_SEND_MESSAGES
/* same as GOutputMessage used for g_socket_send_messages() */
typedef struct {
/*< private >*/
GSocketAddress *address;
GOutputVector *vectors;
guint num_vectors;
guint bytes_sent;
GSocketControlMessage **control_messages;
guint num_control_messages;
} GstOutputMessage;
#else
typedef GOutputMessage GstOutputMessage;
#endif /* HAVE_G_SOCKET_SEND_MESSAGES*/
typedef struct {
gint ref_count; /* for memory management */
gint add_count; /* how often this address has been added */
@ -61,6 +83,7 @@ struct _GstMultiUDPSink {
GSocket *used_socket, *used_socket_v6;
GCancellable *cancellable;
/* client management */
GMutex client_lock;
GList *clients;
guint num_v4_unique; /* number IPv4 clients (excluding duplicates) */
@ -68,6 +91,7 @@ struct _GstMultiUDPSink {
guint num_v6_unique; /* number IPv6 clients (excluding duplicates) */
guint num_v6_all; /* number IPv6 clients (including duplicates) */
/* pre-allocated scrap space for render function */
GOutputVector *vec;
GstMapInfo *map;