multifdsink: Add a test involving a slow client

https://bugzilla.gnome.org/show_bug.cgi?id=774908
This commit is contained in:
Jan Alexander Steffens (heftig) 2016-11-29 16:26:22 +01:00 committed by Sebastian Dröge
parent 8b9ebd8f88
commit 9bdf7ff6d0

View file

@ -159,10 +159,10 @@ G_STMT_START { \
char data[size + 1]; \
int nbytes; \
\
GST_DEBUG ("%s: reading %d bytes", msg, size); \
GST_LOG ("%s: reading %d bytes", msg, size); \
nbytes = read (fd, data, size); \
data[size] = 0; \
GST_DEBUG ("%s: read %d bytes", msg, nbytes); \
GST_LOG ("%s: read %d bytes", msg, nbytes); \
fail_if (nbytes < size); \
fail_unless (memcmp (data, ref, size) == 0, \
"data read '%s' differs from '%s'", data, ref); \
@ -797,6 +797,155 @@ GST_START_TEST (test_client_next_keyframe)
GST_END_TEST;
/* number of 16-byte chunks.
* should be bigger than any OS pipe buffer, hopefully */
#define BIG_BUFFER_MULT (16 * 1024)
static GstBuffer *
gst_new_buffer_big (int i)
{
GstMapInfo info;
gchar *data;
gint j;
GstBuffer *buffer = gst_buffer_new_and_alloc (16 * BIG_BUFFER_MULT);
/* copy some id */
g_assert (gst_buffer_map (buffer, &info, GST_MAP_WRITE));
data = (gchar *) info.data;
for (j = 0; j < BIG_BUFFER_MULT; j++) {
g_snprintf (data + 16 * j, 16, "deadbee%08x", i);
}
gst_buffer_unmap (buffer, &info);
return buffer;
}
#define fail_unless_read_big(msg,fd,i) \
G_STMT_START { \
char ref[16]; \
int j; \
g_snprintf (ref, 16, "deadbee%08x", i); \
for (j = 0; j < BIG_BUFFER_MULT; j++) { \
fail_unless_read (msg, fd, 16, ref); \
} \
} G_STMT_END;
#define fail_unless_eof(msg,fd) \
G_STMT_START { \
char data; \
int nbytes; \
\
GST_LOG ("%s: checking for EOF", msg); \
nbytes = read (fd, &data, 1); \
GST_LOG ("%s: read %d bytes", msg, nbytes); \
fail_if (nbytes != 0, "%s: not at EOF (%d)", msg, nbytes); \
} G_STMT_END;
static gint
get_buffers_queued (GstElement * sink)
{
gint buffers;
g_object_get (sink, "buffers-queued", &buffers, NULL);
return buffers;
}
static gint
get_num_handles (GstElement * sink)
{
gint handles;
g_object_get (sink, "num-handles", &handles, NULL);
return handles;
}
/* test kicking out clients */
GST_START_TEST (test_client_kick)
{
GstElement *sink;
GstCaps *caps;
int pfd1[2];
int pfd2[2];
int pfd3[2];
gint i, initial_buffers = 3, num_buffers = 0;
sink = setup_multifdsink ();
g_object_set (sink, "units-max", initial_buffers, NULL);
fail_if (pipe (pfd1) == -1);
fail_if (pipe (pfd2) == -1);
fail_if (pipe (pfd3) == -1);
ASSERT_SET_STATE (sink, GST_STATE_PLAYING, GST_STATE_CHANGE_ASYNC);
caps = gst_caps_from_string ("application/x-gst-check");
gst_check_setup_events (mysrcpad, sink, caps, GST_FORMAT_BYTES);
GST_DEBUG ("Created test caps %p %" GST_PTR_FORMAT, caps, caps);
/* add the clients */
g_signal_emit_by_name (sink, "add", pfd1[1]);
g_signal_emit_by_name (sink, "add", pfd2[1]);
g_signal_emit_by_name (sink, "add", pfd3[1]);
/* push initial buffers in */
for (i = 0; i < initial_buffers; i++) {
GstBuffer *buffer = gst_new_buffer_big (i);
fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
num_buffers++;
GST_DEBUG ("Pushed buffer #%d; %d buffers queued", i,
get_buffers_queued (sink));
}
/* check initial state */
fail_unless_num_handles (sink, 3);
for (i = 0; i < initial_buffers; i++) {
fail_unless_read_big ("client 1", pfd1[0], i);
fail_unless_read_big ("client 3", pfd3[0], i);
GST_DEBUG ("Read buffer #%d", i);
}
/* check that all 3 clients still exist */
fail_unless_num_handles (sink, 3);
/* now push buffers until client 2 gets kicked.
* we don't know how much to push because both the element itself
* and the OS pipes have internal buffering of unknown size */
for (i = initial_buffers; get_num_handles (sink) == 3; i++) {
GstBuffer *buffer = gst_new_buffer_big (i);
fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
num_buffers++;
GST_DEBUG ("Pushed buffer #%d; %d buffers queued", i,
get_buffers_queued (sink));
}
/* check that 2 clients remain */
fail_unless_num_handles (sink, 2);
/* read the data we've pushed until now */
for (i = initial_buffers; i < num_buffers; i++) {
fail_unless_read_big ("client 1", pfd1[0], i);
fail_unless_read_big ("client 3", pfd3[0], i);
GST_DEBUG ("Read buffer #%d", i);
}
GST_DEBUG ("cleaning up multifdsink");
g_signal_emit_by_name (sink, "remove", pfd1[1]);
g_signal_emit_by_name (sink, "remove", pfd3[1]);
fail_unless (close (pfd1[1]) == 0);
fail_unless (close (pfd3[1]) == 0);
fail_unless_eof ("client 1", pfd1[0]);
fail_unless_eof ("client 3", pfd3[0]);
ASSERT_SET_STATE (sink, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
cleanup_multifdsink (sink);
ASSERT_CAPS_REFCOUNT (caps, "caps", 1);
gst_caps_unref (caps);
}
GST_END_TEST;
/* FIXME: add test simulating chained oggs where:
* sync-method is burst-on-connect
* (when multifdsink actually does burst-on-connect based on byte size, not
@ -820,6 +969,7 @@ multifdsink_suite (void)
tcase_add_test (tc_chain, test_burst_client_bytes_keyframe);
tcase_add_test (tc_chain, test_burst_client_bytes_with_keyframe);
tcase_add_test (tc_chain, test_client_next_keyframe);
tcase_add_test (tc_chain, test_client_kick);
return s;
}