multisocketsink: Map GstMemorys individually when sending

If a buffer is made up of non-contiguous `GstMemory`s `gst_buffer_map`
has to copy all the data into a new `GstMemory` which is contiguous.  By
mapping all the `GstMemory`s individually and then using scatter-gather
IO we avoid this situation.

This is a preparatory step for adding support to multisocketsink for
sending file descriptors, where a GstBuffer may be made up of several
`GstMemory`s, some of which are backed by a memfd or file, but I think this
patch is valid and useful on its own.

Fixes https://bugzilla.gnome.org/show_bug.cgi?id=746150
This commit is contained in:
William Manley 2015-03-13 12:49:31 +00:00 committed by Wim Taymans
parent 274984e83b
commit b8232a7467
2 changed files with 176 additions and 18 deletions

View file

@ -600,6 +600,100 @@ gst_multi_socket_sink_handle_client_read (GstMultiSocketSink * sink,
return ret;
}
/**
* map_memory_output_vector_n:
* @buf: The #GstBuffer that should be mapped
* @offset: Offset into the buffer that should be mapped
* @vectors: (out,array length=num_vectors): an array of #GOutputVector structs to write into
* @mapinfo: (out,array length=num_vectors): an array of #GstMapInfo structs to write into
* @num_vectors: the number of elements in @vectors to prevent buffer overruns
*
* Maps a buffer into memory, populating a #GOutputVector to use scatter-gather
* I/O to send the data over a socket. The whole buffer won't be mapped into
* memory if it consists of more than @num_vectors #GstMemory s.
*
* Use #unmap_n_memorys after you are
* finished with the mappings.
*
* Returns: The number of GstMemorys mapped
*/
static int
map_n_memory_output_vector (GstBuffer * buf, size_t offset,
GOutputVector * vectors, GstMapInfo * mapinfo, int num_vectors)
{
guint mem_idx, mem_len;
gsize mem_skip;
size_t maxsize;
int i;
g_return_val_if_fail (num_vectors > 0, 0);
memset (vectors, 0, sizeof (GOutputVector) * num_vectors);
maxsize = gst_buffer_get_size (buf) - offset;
if (!gst_buffer_find_memory (buf, offset, maxsize, &mem_idx, &mem_len,
&mem_skip))
g_error ("Unable to map memory at offset %" G_GSIZE_FORMAT ", buffer "
"length is %" G_GSIZE_FORMAT, offset, gst_buffer_get_size (buf));
for (i = 0; i < mem_len && i < num_vectors; i++) {
GstMapInfo map = { 0 };
GstMemory *mem = gst_buffer_peek_memory (buf, mem_idx + i);
if (!gst_memory_map (mem, &map, GST_MAP_READ))
g_error ("Unable to map memory %p. This should never happen.", mem);
if (i == 0) {
vectors[i].buffer = map.data + mem_skip;
vectors[i].size = map.size - mem_skip;
} else {
vectors[i].buffer = map.data;
vectors[i].size = map.size;
}
mapinfo[i] = map;
}
return i;
}
/**
* map_n_memory_output_vector:
* @buf: The #GstBuffer that should be mapped
* @offset: Offset into the buffer that should be mapped
* @vectors: (out,array length=num_vectors): an array of #GOutputVector structs to write into
* @num_vectors: the number of elements in @vectors to prevent buffer overruns
*
* Returns: The number of GstMemorys mapped
*/
static void
unmap_n_memorys (GstMapInfo * mapinfo, int num_mappings)
{
int i;
g_return_if_fail (num_mappings > 0);
for (i = 0; i < num_mappings; i++)
gst_memory_unmap (mapinfo[i].memory, &mapinfo[i]);
}
static gssize
gst_multi_socket_sink_write (GstMultiSocketSink * sink,
GSocket * sock, GstBuffer * buffer, gsize bufoffset,
GCancellable * cancellable, GError ** err)
{
GstMapInfo maps[8];
GOutputVector vec[8];
guint mems_mapped;
gssize wrote;
mems_mapped = map_n_memory_output_vector (buffer, bufoffset, vec, maps, 8);
wrote =
g_socket_send_message (sock, NULL, vec, mems_mapped, NULL, 0, 0,
cancellable, err);
unmap_n_memorys (maps, mems_mapped);
return wrote;
}
/* Handle a write on a client,
* which indicates a read request from a client.
*
@ -644,8 +738,6 @@ gst_multi_socket_sink_handle_client_write (GstMultiSocketSink * sink,
more = TRUE;
do {
gint maxsize;
if (!mhclient->sending) {
/* client is not working on a buffer */
if (mhclient->bufpos == -1) {
@ -725,22 +817,12 @@ gst_multi_socket_sink_handle_client_write (GstMultiSocketSink * sink,
if (mhclient->sending) {
gssize wrote;
GstBuffer *head;
GstMapInfo map;
/* pick first buffer from list */
head = GST_BUFFER (mhclient->sending->data);
gst_buffer_map (head, &map, GST_MAP_READ);
maxsize = map.size - mhclient->bufoffset;
/* FIXME: specific */
/* try to write the complete buffer */
wrote =
g_socket_send (mhclient->handle.socket,
(gchar *) map.data + mhclient->bufoffset, maxsize, sink->cancellable,
&err);
gst_buffer_unmap (head, &map);
wrote = gst_multi_socket_sink_write (sink, mhclient->handle.socket, head,
mhclient->bufoffset, sink->cancellable, &err);
if (wrote < 0) {
/* hmm error.. */
@ -755,7 +837,7 @@ gst_multi_socket_sink_handle_client_write (GstMultiSocketSink * sink,
goto write_error;
}
} else {
if (wrote < maxsize) {
if (wrote < (gst_buffer_get_size (head) - mhclient->bufoffset)) {
/* partial write, try again now */
GST_LOG_OBJECT (sink,
"partial write on %p of %" G_GSSIZE_FORMAT " bytes",

View file

@ -180,12 +180,15 @@ GST_START_TEST (test_add_client)
gst_check_setup_events (mysrcpad, sink, caps, GST_FORMAT_BYTES);
ASSERT_CAPS_REFCOUNT (caps, "caps", 3);
gst_buffer_fill (buffer, 0, "dead", 4);
gst_buffer_append_memory (buffer,
gst_memory_new_wrapped (GST_MEMORY_FLAG_READONLY, (gpointer) " good", 5,
0, 5, NULL, NULL));
fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
GST_DEBUG ("reading");
fail_if (read_handle (srcsocket, data, 4) < 4);
fail_unless (strncmp (data, "dead", 4) == 0);
wait_bytes_served (sink, 4);
fail_if (read_handle (srcsocket, data, 9) < 9);
fail_unless (strncmp (data, "dead good", 9) == 0);
wait_bytes_served (sink, 9);
GST_DEBUG ("cleaning up multisocketsink");
ASSERT_SET_STATE (sink, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
@ -200,6 +203,78 @@ GST_START_TEST (test_add_client)
GST_END_TEST;
typedef struct
{
GSocket *sinksocket, *srcsocket;
GstElement *sink;
} TestSinkAndSocket;
static void
setup_sink_with_socket (TestSinkAndSocket * tsas)
{
GstCaps *caps = NULL;
tsas->sink = setup_multisocketsink ();
fail_unless (setup_handles (&tsas->sinksocket, &tsas->srcsocket));
ASSERT_SET_STATE (tsas->sink, GST_STATE_PLAYING, GST_STATE_CHANGE_ASYNC);
/* add the client */
g_signal_emit_by_name (tsas->sink, "add", tsas->sinksocket);
caps = gst_caps_from_string ("application/x-gst-check");
gst_check_setup_events (mysrcpad, tsas->sink, caps, GST_FORMAT_BYTES);
gst_caps_unref (caps);
}
static void
teardown_sink_with_socket (TestSinkAndSocket * tsas)
{
if (tsas->sink != NULL) {
ASSERT_SET_STATE (tsas->sink, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
cleanup_multisocketsink (tsas->sink);
tsas->sink = 0;
}
if (tsas->sinksocket != NULL) {
g_object_unref (tsas->sinksocket);
tsas->sinksocket = 0;
}
if (tsas->srcsocket != NULL) {
g_object_unref (tsas->srcsocket);
tsas->srcsocket = 0;
}
}
GST_START_TEST (test_sending_buffers_with_9_gstmemorys)
{
TestSinkAndSocket tsas = { 0 };
GstBuffer *buffer;
int i;
const char *numbers[9] = { "one", "two", "three", "four", "five", "six",
"seven", "eight", "nine"
};
const char numbers_concat[] = "onetwothreefourfivesixseveneightnine";
gchar data[sizeof (numbers_concat)];
int len = sizeof (numbers_concat) - 1;
setup_sink_with_socket (&tsas);
buffer = gst_buffer_new ();
for (i = 0; i < sizeof (numbers) / sizeof (*numbers); i++)
gst_buffer_append_memory (buffer,
gst_memory_new_wrapped (GST_MEMORY_FLAG_READONLY, (gpointer) numbers[i],
strlen (numbers[i]), 0, strlen (numbers[i]), NULL, NULL));
fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
GST_DEBUG ("reading");
fail_if (read_handle (tsas.srcsocket, data, len) < len);
fail_unless (strncmp (data, numbers_concat, len) == 0);
teardown_sink_with_socket (&tsas);
}
GST_END_TEST;
/* from the given two data buffers, create two streamheader buffers and
* some caps that match it, and store them in the given pointers
* returns one ref to each of the buffers and the caps */
@ -874,6 +949,7 @@ multisocketsink_suite (void)
suite_add_tcase (s, tc_chain);
tcase_add_test (tc_chain, test_no_clients);
tcase_add_test (tc_chain, test_add_client);
tcase_add_test (tc_chain, test_sending_buffers_with_9_gstmemorys);
tcase_add_test (tc_chain, test_streamheader);
tcase_add_test (tc_chain, test_change_streamheader);
tcase_add_test (tc_chain, test_burst_client_bytes);