gst/tcp/gstmultifdsink.*: Added shiny new burst-on-connect methods.

Original commit message from CVS:
* gst/tcp/gstmultifdsink.c: (gst_sync_method_get_type),
(gst_unit_type_get_type), (gst_multi_fd_sink_class_init),
(gst_multi_fd_sink_init), (gst_multi_fd_sink_add_full),
(gst_multi_fd_sink_add), (gst_multi_fd_sink_handle_client_read),
(find_syncframe), (find_limits), (assign_value),
(count_burst_unit), (gst_multi_fd_sink_new_client),
(gst_multi_fd_sink_handle_client_write),
(gst_multi_fd_sink_queue_buffer), (gst_multi_fd_sink_render),
(gst_multi_fd_sink_set_property), (gst_multi_fd_sink_get_property),
(gst_multi_fd_sink_change_state):
* gst/tcp/gstmultifdsink.h:
Added shiny new burst-on-connect methods.
Add properties to control the minimal amount of data queued.
Small cleanups.
API: bytes-min property
API: time-min property
API: buffers-min property
API: burst-unit property
API: burst-value property
API: add-full signal
* gst/tcp/gsttcp-marshal.list:
Added new marshaller code for the new signal.
* tests/check/elements/multifdsink.c: (GST_START_TEST),
(multifdsink_suite):
Added testcases for new burst methods.
This commit is contained in:
Wim Taymans 2006-06-19 17:12:57 +00:00
parent 7623b0d79a
commit 702d5980d3
5 changed files with 1091 additions and 188 deletions

View file

@ -1,3 +1,33 @@
2006-06-19 Wim Taymans <wim@fluendo.com>
* gst/tcp/gstmultifdsink.c: (gst_sync_method_get_type),
(gst_unit_type_get_type), (gst_multi_fd_sink_class_init),
(gst_multi_fd_sink_init), (gst_multi_fd_sink_add_full),
(gst_multi_fd_sink_add), (gst_multi_fd_sink_handle_client_read),
(find_syncframe), (find_limits), (assign_value),
(count_burst_unit), (gst_multi_fd_sink_new_client),
(gst_multi_fd_sink_handle_client_write),
(gst_multi_fd_sink_queue_buffer), (gst_multi_fd_sink_render),
(gst_multi_fd_sink_set_property), (gst_multi_fd_sink_get_property),
(gst_multi_fd_sink_change_state):
* gst/tcp/gstmultifdsink.h:
Added shiny new burst-on-connect methods.
Add properties to control the minimal amount of data queued.
Small cleanups.
API: bytes-min property
API: time-min property
API: buffers-min property
API: burst-unit property
API: burst-value property
API: add-full signal
* gst/tcp/gsttcp-marshal.list:
Added new marshaller code for the new signal.
* tests/check/elements/multifdsink.c: (GST_START_TEST),
(multifdsink_suite):
Added testcases for new burst methods.
2006-06-19 Edward Hervey <edward@fluendo.com>
* ext/theora/theoradec.c: (clip_buffer), (theora_dec_push):

File diff suppressed because it is too large Load diff

View file

@ -73,9 +73,15 @@ typedef enum
/**
* GstSyncMethod:
* @GST_SYNC_METHOD_LATEST : client receives most recent buffer
* @GST_SYNC_METHOD_NEXT_KEYFRAME : client receives next keyframe
* @GST_SYNC_METHOD_LATEST_KEYFRAME: client receives latest keyframe (burst)
* @GST_SYNC_METHOD_LATEST : client receives most recent buffer
* @GST_SYNC_METHOD_NEXT_KEYFRAME : client receives next keyframe
* @GST_SYNC_METHOD_LATEST_KEYFRAME : client receives latest keyframe (burst)
* @GST_SYNC_METHOD_BURST : client receives specific amount of data
* @GST_SYNC_METHOD_BURST_KEYFRAME : client receives specific amount of data
* starting from latest keyframe
* @GST_SYNC_METHOD_BURST_WITH_KEYFRAME : client receives specific amount of data from
* a keyframe, or if there is not enough data after
* the keyframe, starting before the keyframe
*
* This enum defines the selection of the first buffer that is sent
* to a new client.
@ -85,18 +91,23 @@ typedef enum
GST_SYNC_METHOD_LATEST,
GST_SYNC_METHOD_NEXT_KEYFRAME,
GST_SYNC_METHOD_LATEST_KEYFRAME,
GST_SYNC_METHOD_BURST,
GST_SYNC_METHOD_BURST_KEYFRAME,
GST_SYNC_METHOD_BURST_WITH_KEYFRAME,
} GstSyncMethod;
/**
* GstUnitType:
* @GST_UNIT_TYPE_BUFFERS: a buffer
* @GST_UNIT_TYPE_TIME : timeunits (in nanoseconds)
* @GST_UNIT_TYPE_BYTES : bytes
* @GST_UNIT_TYPE_UNDEFINED: undefined
* @GST_UNIT_TYPE_BUFFERS : buffers
* @GST_UNIT_TYPE_TIME : timeunits (in nanoseconds)
* @GST_UNIT_TYPE_BYTES : bytes
*
* The units used to specify limits.
*/
typedef enum
{
GST_UNIT_TYPE_UNDEFINED,
GST_UNIT_TYPE_BUFFERS,
GST_UNIT_TYPE_TIME,
GST_UNIT_TYPE_BYTES,
@ -144,6 +155,13 @@ typedef struct {
gboolean caps_sent;
gboolean new_connection;
/* method to sync client when connecting */
GstSyncMethod sync_method;
GstUnitType burst_min_unit;
guint64 burst_min_value;
GstUnitType burst_max_unit;
guint64 burst_max_value;
GstCaps *caps; /* caps of last queued buffer */
/* stats */
@ -192,12 +210,24 @@ struct _GstMultiFdSink {
gboolean running; /* the thread state */
GThread *thread; /* the sender thread */
/* these values are used to check if a client is reading fast
* enough and to control receovery */
GstUnitType unit_type;/* the type of the units */
gint units_max; /* max units to queue */
gint units_max; /* max units to queue for a client */
gint units_soft_max; /* max units a client can lag before recovery starts */
GstRecoverPolicy recover_policy;
GstClockTime timeout; /* max amount of nanoseconds to remain idle */
GstSyncMethod sync_method; /* what method to use for connecting clients */
GstSyncMethod def_sync_method; /* what method to use for connecting clients */
GstUnitType def_burst_unit;
guint64 def_burst_value;
/* these values are used to control the amount of data
* kept in the queues. It allows clients to perform a burst
* on connect. */
gint bytes_min; /* min number of bytes to queue */
gint64 time_min; /* min time to queue */
gint buffers_min; /* min number of buffers to queue */
/* stats */
gint buffers_queued; /* number of queued buffers */
@ -212,6 +242,9 @@ struct _GstMultiFdSinkClass {
/* element methods */
void (*add) (GstMultiFdSink *sink, int fd);
void (*add_full) (GstMultiFdSink *sink, int fd, GstSyncMethod sync,
GstUnitType format, guint64 value,
GstUnitType max_unit, guint64 max_value);
void (*remove) (GstMultiFdSink *sink, int fd);
void (*clear) (GstMultiFdSink *sink);
GValueArray* (*get_stats) (GstMultiFdSink *sink, int fd);
@ -231,10 +264,14 @@ struct _GstMultiFdSinkClass {
GType gst_multi_fd_sink_get_type (void);
void gst_multi_fd_sink_add (GstMultiFdSink *sink, int fd);
void gst_multi_fd_sink_add_full (GstMultiFdSink *sink, int fd, GstSyncMethod sync,
GstUnitType min_unit, guint64 min_value,
GstUnitType max_unit, guint64 max_value);
void gst_multi_fd_sink_remove (GstMultiFdSink *sink, int fd);
void gst_multi_fd_sink_clear (GstMultiFdSink *sink);
GValueArray* gst_multi_fd_sink_get_stats (GstMultiFdSink *sink, int fd);
G_END_DECLS
#endif /* __GST_MULTI_FD_SINK_H__ */

View file

@ -1,4 +1,5 @@
VOID:STRING,UINT
VOID:INT
VOID:INT,BOXED
VOID:INT,BOOLEAN,INT,UINT64,INT,UINT64
BOXED:INT

View file

@ -125,6 +125,7 @@ GST_START_TEST (test_add_client)
memcpy (GST_BUFFER_DATA (buffer), "dead", 4);
fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
GST_DEBUG ("reading");
fail_if (read (pfd[0], data, 4) < 4);
fail_unless (strncmp (data, "dead", 4) == 0);
wait_bytes_served (sink, 4);
@ -441,6 +442,358 @@ GST_START_TEST (test_change_streamheader)
GST_END_TEST;
/* keep 100 bytes and burst 80 bytes to clients */
GST_START_TEST (test_burst_client_bytes)
{
GstElement *sink;
GstBuffer *buffer;
GstCaps *caps;
int pfd1[2];
int pfd2[2];
int pfd3[2];
gchar data[16];
guint64 bytes_served;
gint i;
guint buffers_queued;
sink = setup_multifdsink ();
/* make sure we keep at least 100 bytes at all times */
g_object_set (sink, "bytes-min", 100, NULL);
g_object_set (sink, "sync-method", 3, NULL); /* 3 = burst */
g_object_set (sink, "burst-unit", 3, NULL); /* 3 = bytes */
g_object_set (sink, "burst-value", (guint64) 80, NULL);
fail_if (pipe (pfd1) == -1);
fail_if (pipe (pfd2) == -1);
fail_if (pipe (pfd3) == -1);
ASSERT_SET_STATE (sink, GST_STATE_PLAYING, GST_STATE_CHANGE_ASYNC);
caps = gst_caps_from_string ("application/x-gst-check");
GST_DEBUG ("Created test caps %p %" GST_PTR_FORMAT, caps, caps);
/* push buffers in, 9 * 16 bytes = 144 bytes */
for (i = 0; i < 9; i++) {
gchar *data;
buffer = gst_buffer_new_and_alloc (16);
gst_buffer_set_caps (buffer, caps);
/* copy some id */
data = (gchar *) GST_BUFFER_DATA (buffer);
g_snprintf (data, 16, "deadbee%08x", i);
fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
}
/* check that at least 7 buffers (112 bytes) are in the queue */
g_object_get (sink, "buffers-queued", &buffers_queued, NULL);
fail_if (buffers_queued != 7);
/* now add the clients */
g_signal_emit_by_name (sink, "add", pfd1[1]);
g_signal_emit_by_name (sink, "add_full", pfd2[1], 3,
3, (guint64) 50, 3, (guint64) 200);
g_signal_emit_by_name (sink, "add_full", pfd3[1], 3,
3, (guint64) 50, 3, (guint64) 50);
/* push last buffer to make client fds ready for reading */
for (i = 9; i < 10; i++) {
gchar *data;
buffer = gst_buffer_new_and_alloc (16);
gst_buffer_set_caps (buffer, caps);
/* copy some id */
data = (gchar *) GST_BUFFER_DATA (buffer);
g_snprintf (data, 16, "deadbee%08x", i);
fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
}
/* now we should only read the last 5 buffers (5 * 16 = 80 bytes) */
GST_DEBUG ("Reading from client 1");
fail_if (read (pfd1[0], data, 16) < 16);
fail_unless (strncmp (data, "deadbee00000005", 16) == 0);
fail_if (read (pfd1[0], data, 16) < 16);
fail_unless (strncmp (data, "deadbee00000006", 16) == 0);
fail_if (read (pfd1[0], data, 16) < 16);
fail_unless (strncmp (data, "deadbee00000007", 16) == 0);
fail_if (read (pfd1[0], data, 16) < 16);
fail_unless (strncmp (data, "deadbee00000008", 16) == 0);
fail_if (read (pfd1[0], data, 16) < 16);
fail_unless (strncmp (data, "deadbee00000009", 16) == 0);
/* second client only bursts 50 bytes = 4 buffers (we get 4 buffers since
* the max alows it) */
GST_DEBUG ("Reading from client 2");
fail_if (read (pfd2[0], data, 16) < 16);
fail_unless (strncmp (data, "deadbee00000006", 16) == 0);
fail_if (read (pfd2[0], data, 16) < 16);
fail_unless (strncmp (data, "deadbee00000007", 16) == 0);
fail_if (read (pfd2[0], data, 16) < 16);
fail_unless (strncmp (data, "deadbee00000008", 16) == 0);
fail_if (read (pfd2[0], data, 16) < 16);
fail_unless (strncmp (data, "deadbee00000009", 16) == 0);
/* third client only bursts 50 bytes = 4 buffers, we can't send
* more than 50 bytes so we only get 3 buffers (48 bytes). */
GST_DEBUG ("Reading from client 3");
fail_if (read (pfd3[0], data, 16) < 16);
fail_unless (strncmp (data, "deadbee00000007", 16) == 0);
fail_if (read (pfd3[0], data, 16) < 16);
fail_unless (strncmp (data, "deadbee00000008", 16) == 0);
fail_if (read (pfd3[0], data, 16) < 16);
fail_unless (strncmp (data, "deadbee00000009", 16) == 0);
GST_DEBUG ("cleaning up multifdsink");
ASSERT_SET_STATE (sink, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
cleanup_multifdsink (sink);
ASSERT_CAPS_REFCOUNT (caps, "caps", 1);
gst_caps_unref (caps);
}
GST_END_TEST;
/* keep 100 bytes and burst 80 bytes to clients */
GST_START_TEST (test_burst_client_bytes_keyframe)
{
GstElement *sink;
GstBuffer *buffer;
GstCaps *caps;
int pfd1[2];
int pfd2[2];
int pfd3[2];
gchar data[16];
guint64 bytes_served;
gint i;
guint buffers_queued;
sink = setup_multifdsink ();
/* make sure we keep at least 100 bytes at all times */
g_object_set (sink, "bytes-min", 100, NULL);
g_object_set (sink, "sync-method", 4, NULL); /* 3 = burst_keyframe */
g_object_set (sink, "burst-unit", 3, NULL); /* 3 = bytes */
g_object_set (sink, "burst-value", (guint64) 80, NULL);
fail_if (pipe (pfd1) == -1);
fail_if (pipe (pfd2) == -1);
fail_if (pipe (pfd3) == -1);
ASSERT_SET_STATE (sink, GST_STATE_PLAYING, GST_STATE_CHANGE_ASYNC);
caps = gst_caps_from_string ("application/x-gst-check");
GST_DEBUG ("Created test caps %p %" GST_PTR_FORMAT, caps, caps);
/* push buffers in, 9 * 16 bytes = 144 bytes */
for (i = 0; i < 9; i++) {
gchar *data;
buffer = gst_buffer_new_and_alloc (16);
gst_buffer_set_caps (buffer, caps);
/* mark most buffers as delta */
if (i != 0 && i != 4 && i != 8)
GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DELTA_UNIT);
/* copy some id */
data = (gchar *) GST_BUFFER_DATA (buffer);
g_snprintf (data, 16, "deadbee%08x", i);
fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
}
/* check that at least 7 buffers (112 bytes) are in the queue */
g_object_get (sink, "buffers-queued", &buffers_queued, NULL);
fail_if (buffers_queued != 7);
/* now add the clients */
g_signal_emit_by_name (sink, "add", pfd1[1]);
g_signal_emit_by_name (sink, "add_full", pfd2[1], 4,
3, (guint64) 50, 3, (guint64) 90);
g_signal_emit_by_name (sink, "add_full", pfd3[1], 4,
3, (guint64) 50, 3, (guint64) 50);
/* push last buffer to make client fds ready for reading */
for (i = 9; i < 10; i++) {
gchar *data;
buffer = gst_buffer_new_and_alloc (16);
gst_buffer_set_caps (buffer, caps);
GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DELTA_UNIT);
/* copy some id */
data = (gchar *) GST_BUFFER_DATA (buffer);
g_snprintf (data, 16, "deadbee%08x", i);
fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
}
/* now we should only read the last 6 buffers (min 5 * 16 = 80 bytes),
* keyframe at buffer 4 */
GST_DEBUG ("Reading from client 1");
fail_if (read (pfd1[0], data, 16) < 16);
fail_unless (strncmp (data, "deadbee00000004", 16) == 0);
fail_if (read (pfd1[0], data, 16) < 16);
fail_unless (strncmp (data, "deadbee00000005", 16) == 0);
fail_if (read (pfd1[0], data, 16) < 16);
fail_unless (strncmp (data, "deadbee00000006", 16) == 0);
fail_if (read (pfd1[0], data, 16) < 16);
fail_unless (strncmp (data, "deadbee00000007", 16) == 0);
fail_if (read (pfd1[0], data, 16) < 16);
fail_unless (strncmp (data, "deadbee00000008", 16) == 0);
fail_if (read (pfd1[0], data, 16) < 16);
fail_unless (strncmp (data, "deadbee00000009", 16) == 0);
/* second client only bursts 50 bytes = 4 buffers, there is
* no keyframe above min and below max, so get one below min */
GST_DEBUG ("Reading from client 2");
fail_if (read (pfd2[0], data, 16) < 16);
fail_unless (strncmp (data, "deadbee00000008", 16) == 0);
fail_if (read (pfd2[0], data, 16) < 16);
fail_unless (strncmp (data, "deadbee00000009", 16) == 0);
/* third client only bursts 50 bytes = 4 buffers, we can't send
* more than 50 bytes so we only get 2 buffers (32 bytes). */
GST_DEBUG ("Reading from client 3");
fail_if (read (pfd3[0], data, 16) < 16);
fail_unless (strncmp (data, "deadbee00000008", 16) == 0);
fail_if (read (pfd3[0], data, 16) < 16);
fail_unless (strncmp (data, "deadbee00000009", 16) == 0);
GST_DEBUG ("cleaning up multifdsink");
ASSERT_SET_STATE (sink, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
cleanup_multifdsink (sink);
ASSERT_CAPS_REFCOUNT (caps, "caps", 1);
gst_caps_unref (caps);
}
GST_END_TEST;
/* keep 100 bytes and burst 80 bytes to clients */
GST_START_TEST (test_burst_client_bytes_with_keyframe)
{
GstElement *sink;
GstBuffer *buffer;
GstCaps *caps;
int pfd1[2];
int pfd2[2];
int pfd3[2];
gchar data[16];
guint64 bytes_served;
gint i;
guint buffers_queued;
sink = setup_multifdsink ();
/* make sure we keep at least 100 bytes at all times */
g_object_set (sink, "bytes-min", 100, NULL);
g_object_set (sink, "sync-method", 5, NULL); /* 3 = burst_with_keyframe */
g_object_set (sink, "burst-unit", 3, NULL); /* 3 = bytes */
g_object_set (sink, "burst-value", (guint64) 80, NULL);
fail_if (pipe (pfd1) == -1);
fail_if (pipe (pfd2) == -1);
fail_if (pipe (pfd3) == -1);
ASSERT_SET_STATE (sink, GST_STATE_PLAYING, GST_STATE_CHANGE_ASYNC);
caps = gst_caps_from_string ("application/x-gst-check");
GST_DEBUG ("Created test caps %p %" GST_PTR_FORMAT, caps, caps);
/* push buffers in, 9 * 16 bytes = 144 bytes */
for (i = 0; i < 9; i++) {
gchar *data;
buffer = gst_buffer_new_and_alloc (16);
gst_buffer_set_caps (buffer, caps);
/* mark most buffers as delta */
if (i != 0 && i != 4 && i != 8)
GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DELTA_UNIT);
/* copy some id */
data = (gchar *) GST_BUFFER_DATA (buffer);
g_snprintf (data, 16, "deadbee%08x", i);
fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
}
/* check that at least 7 buffers (112 bytes) are in the queue */
g_object_get (sink, "buffers-queued", &buffers_queued, NULL);
fail_if (buffers_queued != 7);
/* now add the clients */
g_signal_emit_by_name (sink, "add", pfd1[1]);
g_signal_emit_by_name (sink, "add_full", pfd2[1], 5,
3, (guint64) 50, 3, (guint64) 90);
g_signal_emit_by_name (sink, "add_full", pfd3[1], 5,
3, (guint64) 50, 3, (guint64) 50);
/* push last buffer to make client fds ready for reading */
for (i = 9; i < 10; i++) {
gchar *data;
buffer = gst_buffer_new_and_alloc (16);
gst_buffer_set_caps (buffer, caps);
GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DELTA_UNIT);
/* copy some id */
data = (gchar *) GST_BUFFER_DATA (buffer);
g_snprintf (data, 16, "deadbee%08x", i);
fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
}
/* now we should only read the last 6 buffers (min 5 * 16 = 80 bytes),
* keyframe at buffer 4 */
GST_DEBUG ("Reading from client 1");
fail_if (read (pfd1[0], data, 16) < 16);
fail_unless (strncmp (data, "deadbee00000004", 16) == 0);
fail_if (read (pfd1[0], data, 16) < 16);
fail_unless (strncmp (data, "deadbee00000005", 16) == 0);
fail_if (read (pfd1[0], data, 16) < 16);
fail_unless (strncmp (data, "deadbee00000006", 16) == 0);
fail_if (read (pfd1[0], data, 16) < 16);
fail_unless (strncmp (data, "deadbee00000007", 16) == 0);
fail_if (read (pfd1[0], data, 16) < 16);
fail_unless (strncmp (data, "deadbee00000008", 16) == 0);
fail_if (read (pfd1[0], data, 16) < 16);
fail_unless (strncmp (data, "deadbee00000009", 16) == 0);
/* second client only bursts 50 bytes = 4 buffers, there is
* no keyframe above min and below max, so send min */
GST_DEBUG ("Reading from client 2");
fail_if (read (pfd2[0], data, 16) < 16);
fail_unless (strncmp (data, "deadbee00000006", 16) == 0);
fail_if (read (pfd2[0], data, 16) < 16);
fail_unless (strncmp (data, "deadbee00000007", 16) == 0);
fail_if (read (pfd2[0], data, 16) < 16);
fail_unless (strncmp (data, "deadbee00000008", 16) == 0);
fail_if (read (pfd2[0], data, 16) < 16);
fail_unless (strncmp (data, "deadbee00000009", 16) == 0);
/* third client only bursts 50 bytes = 4 buffers, we can't send
* more than 50 bytes so we only get 3 buffers (48 bytes). */
GST_DEBUG ("Reading from client 3");
fail_if (read (pfd3[0], data, 16) < 16);
fail_unless (strncmp (data, "deadbee00000007", 16) == 0);
fail_if (read (pfd3[0], data, 16) < 16);
fail_unless (strncmp (data, "deadbee00000008", 16) == 0);
fail_if (read (pfd3[0], data, 16) < 16);
fail_unless (strncmp (data, "deadbee00000009", 16) == 0);
GST_DEBUG ("cleaning up multifdsink");
ASSERT_SET_STATE (sink, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
cleanup_multifdsink (sink);
ASSERT_CAPS_REFCOUNT (caps, "caps", 1);
gst_caps_unref (caps);
}
GST_END_TEST;
/* FIXME: add test simulating chained oggs where:
* sync-method is burst-on-connect
* (when multifdsink actually does burst-on-connect based on byte size, not
@ -448,7 +801,6 @@ GST_END_TEST;
* an old client still needs to read from before the new streamheaders
* a new client gets the new streamheaders
*/
Suite *
multifdsink_suite (void)
{
@ -460,6 +812,9 @@ multifdsink_suite (void)
tcase_add_test (tc_chain, test_add_client);
tcase_add_test (tc_chain, test_streamheader);
tcase_add_test (tc_chain, test_change_streamheader);
tcase_add_test (tc_chain, test_burst_client_bytes);
tcase_add_test (tc_chain, test_burst_client_bytes_keyframe);
tcase_add_test (tc_chain, test_burst_client_bytes_with_keyframe);
return s;
}