From 702d5980d3d36ce907fb461e20a68d9d543211f3 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Mon, 19 Jun 2006 17:12:57 +0000 Subject: [PATCH] gst/tcp/gstmultifdsink.*: Added shiny new burst-on-connect methods. Original commit message from CVS: * gst/tcp/gstmultifdsink.c: (gst_sync_method_get_type), (gst_unit_type_get_type), (gst_multi_fd_sink_class_init), (gst_multi_fd_sink_init), (gst_multi_fd_sink_add_full), (gst_multi_fd_sink_add), (gst_multi_fd_sink_handle_client_read), (find_syncframe), (find_limits), (assign_value), (count_burst_unit), (gst_multi_fd_sink_new_client), (gst_multi_fd_sink_handle_client_write), (gst_multi_fd_sink_queue_buffer), (gst_multi_fd_sink_render), (gst_multi_fd_sink_set_property), (gst_multi_fd_sink_get_property), (gst_multi_fd_sink_change_state): * gst/tcp/gstmultifdsink.h: Added shiny new burst-on-connect methods. Add properties to control the minimal amount of data queued. Small cleanups. API: bytes-min property API: time-min property API: buffers-min property API: burst-unit property API: burst-value property API: add-full signal * gst/tcp/gsttcp-marshal.list: Added new marshaller code for the new signal. * tests/check/elements/multifdsink.c: (GST_START_TEST), (multifdsink_suite): Added testcases for new burst methods. --- ChangeLog | 30 ++ gst/tcp/gstmultifdsink.c | 838 +++++++++++++++++++++++------ gst/tcp/gstmultifdsink.h | 53 +- gst/tcp/gsttcp-marshal.list | 1 + tests/check/elements/multifdsink.c | 357 +++++++++++- 5 files changed, 1091 insertions(+), 188 deletions(-) diff --git a/ChangeLog b/ChangeLog index 7793b2aad6..0121bcb068 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,33 @@ +2006-06-19 Wim Taymans + + * gst/tcp/gstmultifdsink.c: (gst_sync_method_get_type), + (gst_unit_type_get_type), (gst_multi_fd_sink_class_init), + (gst_multi_fd_sink_init), (gst_multi_fd_sink_add_full), + (gst_multi_fd_sink_add), (gst_multi_fd_sink_handle_client_read), + (find_syncframe), (find_limits), (assign_value), + (count_burst_unit), (gst_multi_fd_sink_new_client), + (gst_multi_fd_sink_handle_client_write), + (gst_multi_fd_sink_queue_buffer), (gst_multi_fd_sink_render), + (gst_multi_fd_sink_set_property), (gst_multi_fd_sink_get_property), + (gst_multi_fd_sink_change_state): + * gst/tcp/gstmultifdsink.h: + Added shiny new burst-on-connect methods. + Add properties to control the minimal amount of data queued. + Small cleanups. + API: bytes-min property + API: time-min property + API: buffers-min property + API: burst-unit property + API: burst-value property + API: add-full signal + + * gst/tcp/gsttcp-marshal.list: + Added new marshaller code for the new signal. + + * tests/check/elements/multifdsink.c: (GST_START_TEST), + (multifdsink_suite): + Added testcases for new burst methods. + 2006-06-19 Edward Hervey * ext/theora/theoradec.c: (clip_buffer), (theora_dec_push): diff --git a/gst/tcp/gstmultifdsink.c b/gst/tcp/gstmultifdsink.c index 7dfce3e766..4381c53cbf 100644 --- a/gst/tcp/gstmultifdsink.c +++ b/gst/tcp/gstmultifdsink.c @@ -31,19 +31,24 @@ * each descriptor added, the "client-added" signal will be called. * * + * As of version 0.10.8, a client can also be added with the "add-full" signal + * that allows for more control over what and how much data a client + * initially receives. + * + * * Clients can be removed from multifdsink by emiting the "remove" signal. For * each descriptor removed, the "client-removed" signal will be called. The * "client-removed" signal can also be fired when multifdsink decides that a * client is not active anymore or, depending on the value of the * "recover-policy," if the client is reading to slow. * In all cases, multifdsink will never ever close a file descriptor itself. - * The user of multifdsink is responsible for closing the file descriptor. + * The user of multifdsink is responsible for closing all file descriptors. * This can for example be done in response to the "client-fd-removed" signal. * Note that multifdsink still has a reference to the file descriptor when the * "client-removed" signal is emited so that "get-stats" can be performed on * the descriptor; It is therefore not allowed to close the file descriptor in - * the "client-removed" signal, use the "client-fd-removed" signal to close the - * fd. + * the "client-removed" signal, use the "client-fd-removed" signal to safely + * close the fd. * * * Multifdsink internally keeps a queue of the incomming buffers and uses a @@ -53,11 +58,20 @@ * * * When adding a client to multifdsink, the "sync-method" property will define - * which buffer will be sent first to the client. Clients can be sent - * respectively the most recent buffer (which might not be decodable by the - * client when it is not a keyframe), the next keyframe received in multifdsink - * (which can take some time depending on the keyframe rate, or the last - * received keyframe (which will cause a burst-on-connect). + * which buffer in the queued buffers will be sent first to the client. Clients + * can be sent respectively the most recent buffer (which might not be decodable + * by the client when it is not a keyframe), the next keyframe received in + * multifdsink (which can take some time depending on the keyframe rate, or the + * last received keyframe (which will cause a simpl burst-on-connect). + * Multifdsink will always keep at least one keyframe in its internal buffers + * when the sync-mode is set to latest-keyframe. + * + * + * Multifdsink can be instructed to keep at least a minimum amount of data + * expressed in time or byte units in its internal queues with the the + * "time-min" and "bytes-min" properties respectively. These properties are + * usefull if the application adds clients with the "add-full" signal to + * make sure that a burst connect can actually be honored. * * * When streaming data, clients are allowed to read at a different rate than @@ -78,12 +92,14 @@ * buffer queue. * * - * multifdsink will synchronize on the clock before serving the buffers to the - * clients. + * multifdsink will by default synchronize on the clock before serving the buffers + * to the clients. This behaviour can be disabled by setting the sync + * property to FALSE. Multifdsink will be default not do QoS and will never + * drop late buffers. * * * - * Last reviewed on 2006-04-28 (0.10.7) + * Last reviewed on 2006-06-13 (0.10.9) */ #ifdef HAVE_CONFIG_H @@ -147,6 +163,7 @@ enum { /* methods */ SIGNAL_ADD, + SIGNAL_ADD_BURST, SIGNAL_REMOVE, SIGNAL_CLEAR, SIGNAL_GET_STATS, @@ -160,38 +177,51 @@ enum }; /* this is really arbitrarily chosen */ -#define DEFAULT_PROTOCOL GST_TCP_PROTOCOL_NONE -#define DEFAULT_MODE GST_FDSET_MODE_POLL +#define DEFAULT_PROTOCOL GST_TCP_PROTOCOL_NONE +#define DEFAULT_MODE GST_FDSET_MODE_POLL #define DEFAULT_BUFFERS_MAX -1 #define DEFAULT_BUFFERS_SOFT_MAX -1 +#define DEFAULT_TIME_MIN -1 +#define DEFAULT_BYTES_MIN -1 +#define DEFAULT_BUFFERS_MIN -1 #define DEFAULT_UNIT_TYPE GST_UNIT_TYPE_BUFFERS #define DEFAULT_UNITS_MAX -1 #define DEFAULT_UNITS_SOFT_MAX -1 -#define DEFAULT_RECOVER_POLICY GST_RECOVER_POLICY_NONE -#define DEFAULT_TIMEOUT 0 -#define DEFAULT_SYNC_METHOD GST_SYNC_METHOD_LATEST +#define DEFAULT_RECOVER_POLICY GST_RECOVER_POLICY_NONE +#define DEFAULT_TIMEOUT 0 +#define DEFAULT_SYNC_METHOD GST_SYNC_METHOD_LATEST + +#define DEFAULT_BURST_UNIT GST_UNIT_TYPE_UNDEFINED +#define DEFAULT_BURST_VALUE 0 enum { - ARG_0, - ARG_PROTOCOL, - ARG_MODE, - ARG_BUFFERS_QUEUED, - ARG_BYTES_QUEUED, - ARG_TIME_QUEUED, + PROP_0, + PROP_PROTOCOL, + PROP_MODE, + PROP_BUFFERS_QUEUED, + PROP_BYTES_QUEUED, + PROP_TIME_QUEUED, - ARG_UNIT_TYPE, - ARG_UNITS_MAX, - ARG_UNITS_SOFT_MAX, + PROP_UNIT_TYPE, + PROP_UNITS_MAX, + PROP_UNITS_SOFT_MAX, - ARG_BUFFERS_MAX, - ARG_BUFFERS_SOFT_MAX, + PROP_BUFFERS_MAX, + PROP_BUFFERS_SOFT_MAX, - ARG_RECOVER_POLICY, - ARG_TIMEOUT, - ARG_SYNC_METHOD, - ARG_BYTES_TO_SERVE, - ARG_BYTES_SERVED, + PROP_TIME_MIN, + PROP_BYTES_MIN, + PROP_BUFFERS_MIN, + + PROP_RECOVER_POLICY, + PROP_TIMEOUT, + PROP_SYNC_METHOD, + PROP_BYTES_TO_SERVE, + PROP_BYTES_SERVED, + + PROP_BURST_UNIT, + PROP_BURST_VALUE, }; #define GST_TYPE_RECOVER_POLICY (gst_recover_policy_get_type()) @@ -231,6 +261,13 @@ gst_sync_method_get_type (void) {GST_SYNC_METHOD_LATEST_KEYFRAME, "Serve everything since the latest keyframe (burst)", "latest-keyframe"}, + {GST_SYNC_METHOD_BURST, "Serve burst-value data to client", "burst"}, + {GST_SYNC_METHOD_BURST_KEYFRAME, + "Serve burst-value data starting on a keyframe", + "burst-keyframe"}, + {GST_SYNC_METHOD_BURST_WITH_KEYFRAME, + "Serve burst-value data preferably starting on a keyframe", + "burst-with-keyframe"}, {0, NULL, NULL}, }; @@ -240,13 +277,13 @@ gst_sync_method_get_type (void) return sync_method_type; } -#if NOT_IMPLEMENTED #define GST_TYPE_UNIT_TYPE (gst_unit_type_get_type()) static GType gst_unit_type_get_type (void) { static GType unit_type_type = 0; static const GEnumValue unit_type[] = { + {GST_UNIT_TYPE_UNDEFINED, "Undefined", "undefined"}, {GST_UNIT_TYPE_BUFFERS, "Buffers", "buffers"}, {GST_UNIT_TYPE_BYTES, "Bytes", "bytes"}, {GST_UNIT_TYPE_TIME, "Time", "time"}, @@ -258,7 +295,6 @@ gst_unit_type_get_type (void) } return unit_type_type; } -#endif #define GST_TYPE_CLIENT_STATUS (gst_client_status_get_type()) static GType @@ -328,74 +364,97 @@ gst_multi_fd_sink_class_init (GstMultiFdSinkClass * klass) gobject_class->get_property = gst_multi_fd_sink_get_property; gobject_class->finalize = gst_multi_fd_sink_finalize; - g_object_class_install_property (gobject_class, ARG_PROTOCOL, + g_object_class_install_property (gobject_class, PROP_PROTOCOL, g_param_spec_enum ("protocol", "Protocol", "The protocol to wrap data in", GST_TYPE_TCP_PROTOCOL, DEFAULT_PROTOCOL, G_PARAM_READWRITE)); - g_object_class_install_property (gobject_class, ARG_MODE, + g_object_class_install_property (gobject_class, PROP_MODE, g_param_spec_enum ("mode", "Mode", "The mode for selecting activity on the fds", GST_TYPE_FDSET_MODE, DEFAULT_MODE, G_PARAM_READWRITE)); - g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BUFFERS_MAX, + g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_BUFFERS_MAX, g_param_spec_int ("buffers-max", "Buffers max", - "max number of buffers to queue (-1 = no limit)", -1, G_MAXINT, - DEFAULT_BUFFERS_MAX, G_PARAM_READWRITE)); - g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BUFFERS_SOFT_MAX, - g_param_spec_int ("buffers-soft-max", "Buffers soft max", + "max number of buffers to queue for a client (-1 = no limit)", -1, + G_MAXINT, DEFAULT_BUFFERS_MAX, G_PARAM_READWRITE)); + g_object_class_install_property (G_OBJECT_CLASS (klass), + PROP_BUFFERS_SOFT_MAX, g_param_spec_int ("buffers-soft-max", + "Buffers soft max", "Recover client when going over this limit (-1 = no limit)", -1, G_MAXINT, DEFAULT_BUFFERS_SOFT_MAX, G_PARAM_READWRITE)); + g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_BYTES_MIN, + g_param_spec_int ("bytes-min", "Bytes min", + "min number of bytes to queue (-1 = as little as possible)", -1, + G_MAXINT, DEFAULT_BYTES_MIN, G_PARAM_READWRITE)); + g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_TIME_MIN, + g_param_spec_int64 ("time-min", "Time min", + "min number of time to queue (-1 = as litte as possible)", -1, + G_MAXINT64, DEFAULT_TIME_MIN, G_PARAM_READWRITE)); + g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_BUFFERS_MIN, + g_param_spec_int64 ("buffers-min", "Buffers min", + "min number of buffers to queue (-1 = as litte as possible)", -1, + G_MAXINT, DEFAULT_BUFFERS_MIN, G_PARAM_READWRITE)); + #if NOT_IMPLEMENTED - g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_UNIT_TYPE, + g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_UNIT_TYPE, g_param_spec_enum ("unit-type", "Units type", "The unit to measure the max/soft-max/queued properties", GST_TYPE_UNIT_TYPE, DEFAULT_UNIT_TYPE, G_PARAM_READWRITE)); - g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_UNITS_MAX, + g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_UNITS_MAX, g_param_spec_int ("units-max", "Units max", "max number of units to queue (-1 = no limit)", -1, G_MAXINT, DEFAULT_UNITS_MAX, G_PARAM_READWRITE)); - g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_UNITS_SOFT_MAX, + g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_UNITS_SOFT_MAX, g_param_spec_int ("units-soft-max", "Units soft max", "Recover client when going over this limit (-1 = no limit)", -1, G_MAXINT, DEFAULT_UNITS_SOFT_MAX, G_PARAM_READWRITE)); #endif - g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BUFFERS_QUEUED, + g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_BUFFERS_QUEUED, g_param_spec_uint ("buffers-queued", "Buffers queued", "Number of buffers currently queued", 0, G_MAXUINT, 0, G_PARAM_READABLE)); #if NOT_IMPLEMENTED - g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BYTES_QUEUED, + g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_BYTES_QUEUED, g_param_spec_uint ("bytes-queued", "Bytes queued", "Number of bytes currently queued", 0, G_MAXUINT, 0, G_PARAM_READABLE)); - g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_TIME_QUEUED, + g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_TIME_QUEUED, g_param_spec_uint64 ("time-queued", "Time queued", "Number of time currently queued", 0, G_MAXUINT64, 0, G_PARAM_READABLE)); #endif - g_object_class_install_property (gobject_class, ARG_RECOVER_POLICY, + g_object_class_install_property (gobject_class, PROP_RECOVER_POLICY, g_param_spec_enum ("recover-policy", "Recover Policy", "How to recover when client reaches the soft max", GST_TYPE_RECOVER_POLICY, DEFAULT_RECOVER_POLICY, G_PARAM_READWRITE)); - g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_TIMEOUT, + g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_TIMEOUT, g_param_spec_uint64 ("timeout", "Timeout", "Maximum inactivity timeout in nanoseconds for a client (0 = no limit)", 0, G_MAXUINT64, DEFAULT_TIMEOUT, G_PARAM_READWRITE)); - g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_SYNC_METHOD, + g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_SYNC_METHOD, g_param_spec_enum ("sync-method", "Sync Method", "How to sync new clients to the stream", GST_TYPE_SYNC_METHOD, DEFAULT_SYNC_METHOD, G_PARAM_READWRITE)); - g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BYTES_TO_SERVE, + g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_BYTES_TO_SERVE, g_param_spec_uint64 ("bytes-to-serve", "Bytes to serve", "Number of bytes received to serve to clients", 0, G_MAXUINT64, 0, G_PARAM_READABLE)); - g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BYTES_SERVED, + g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_BYTES_SERVED, g_param_spec_uint64 ("bytes-served", "Bytes served", "Total number of bytes send to all clients", 0, G_MAXUINT64, 0, G_PARAM_READABLE)); + g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_BURST_UNIT, + g_param_spec_enum ("burst-unit", "Burst unit", + "The format of the burst units (when sync-method is burst[[-with]-keyframe])", + GST_TYPE_UNIT_TYPE, DEFAULT_BURST_UNIT, G_PARAM_READWRITE)); + g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_BURST_VALUE, + g_param_spec_uint64 ("burst-value", "Burst value", + "The amount of burst expressed in burst-unit", + 0, G_MAXUINT64, DEFAULT_BURST_VALUE, G_PARAM_READWRITE)); + /** * GstMultiFdSink::add: * @gstmultifdsink: the multifdsink element to emit this signal on @@ -407,6 +466,24 @@ gst_multi_fd_sink_class_init (GstMultiFdSinkClass * klass) g_signal_new ("add", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstMultiFdSinkClass, add), NULL, NULL, g_cclosure_marshal_VOID__INT, G_TYPE_NONE, 1, G_TYPE_INT); + /** + * GstMultiFdSink::add-full: + * @gstmultifdsink: the multifdsink element to emit this signal on + * @fd: the file descriptor to add to multifdsink + * @keyframe: start bursting from a keyframe + * @unit_type: the unit-type of @value + * @value: the minimal amount of data to burst expressed in + * @format units. + * + * Hand the given open file descriptor to multifdsink to write to and + * specify the burst parameters for the new connection. + */ + gst_multi_fd_sink_signals[SIGNAL_ADD_BURST] = + g_signal_new ("add-full", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, + G_STRUCT_OFFSET (GstMultiFdSinkClass, add_full), + NULL, NULL, gst_tcp_marshal_VOID__INT_BOOLEAN_INT_UINT64_INT_UINT64, + G_TYPE_NONE, 6, G_TYPE_INT, G_TYPE_BOOLEAN, GST_TYPE_UNIT_TYPE, + G_TYPE_UINT64, GST_TYPE_UNIT_TYPE, G_TYPE_UINT64); /** * GstMultiFdSink::remove: * @gstmultifdsink: the multifdsink element to emit this signal on @@ -510,6 +587,7 @@ gst_multi_fd_sink_class_init (GstMultiFdSinkClass * klass) gstbasesink_class->render = GST_DEBUG_FUNCPTR (gst_multi_fd_sink_render); klass->add = GST_DEBUG_FUNCPTR (gst_multi_fd_sink_add); + klass->add_full = GST_DEBUG_FUNCPTR (gst_multi_fd_sink_add_full); klass->remove = GST_DEBUG_FUNCPTR (gst_multi_fd_sink_remove); klass->clear = GST_DEBUG_FUNCPTR (gst_multi_fd_sink_clear); klass->get_stats = GST_DEBUG_FUNCPTR (gst_multi_fd_sink_get_stats); @@ -533,10 +611,15 @@ gst_multi_fd_sink_init (GstMultiFdSink * this, GstMultiFdSinkClass * klass) this->unit_type = DEFAULT_UNIT_TYPE; this->units_max = DEFAULT_UNITS_MAX; this->units_soft_max = DEFAULT_UNITS_SOFT_MAX; + this->time_min = DEFAULT_TIME_MIN; + this->bytes_min = DEFAULT_BYTES_MIN; + this->buffers_min = DEFAULT_BUFFERS_MIN; this->recover_policy = DEFAULT_RECOVER_POLICY; this->timeout = DEFAULT_TIMEOUT; - this->sync_method = DEFAULT_SYNC_METHOD; + this->def_sync_method = DEFAULT_SYNC_METHOD; + this->def_burst_unit = DEFAULT_BURST_UNIT; + this->def_burst_value = DEFAULT_BURST_VALUE; this->header_flags = 0; } @@ -555,9 +638,11 @@ gst_multi_fd_sink_finalize (GObject * object) G_OBJECT_CLASS (parent_class)->finalize (object); } -/* "add" signal implemntation */ +/* "add-full" signal implemntation */ void -gst_multi_fd_sink_add (GstMultiFdSink * sink, int fd) +gst_multi_fd_sink_add_full (GstMultiFdSink * sink, int fd, + GstSyncMethod sync_method, GstUnitType min_unit, guint64 min_value, + GstUnitType max_unit, guint64 max_value) { GstTCPClient *client; GList *clink; @@ -567,6 +652,12 @@ gst_multi_fd_sink_add (GstMultiFdSink * sink, int fd) GST_DEBUG_OBJECT (sink, "[fd %5d] adding client", fd); + /* do limits check if we can */ + if (min_unit == max_unit) { + if (max_value != -1 && min_value != -1 && max_value < min_value) + goto wrong_limits; + } + /* create client datastructure */ client = g_new0 (GstTCPClient, 1); client->fd.fd = fd; @@ -578,28 +669,25 @@ gst_multi_fd_sink_add (GstMultiFdSink * sink, int fd) client->dropped_buffers = 0; client->avg_queue_size = 0; client->new_connection = TRUE; + client->burst_min_unit = min_unit; + client->burst_min_value = min_value; + client->burst_max_unit = max_unit; + client->burst_max_value = max_value; + client->sync_method = sync_method; /* update start time */ g_get_current_time (&now); client->connect_time = GST_TIMEVAL_TO_TIME (now); client->disconnect_time = 0; - /* send last activity time to connect time */ - client->last_activity_time = GST_TIMEVAL_TO_TIME (now); + /* set last activity time to connect time */ + client->last_activity_time = client->connect_time; CLIENTS_LOCK (sink); /* check the hash to find a duplicate fd */ clink = g_hash_table_lookup (sink->fd_hash, &client->fd.fd); - if (clink != NULL) { - client->status = GST_CLIENT_STATUS_DUPLICATE; - CLIENTS_UNLOCK (sink); - GST_WARNING_OBJECT (sink, "[fd %5d] duplicate client found, refusing", fd); - g_signal_emit (G_OBJECT (sink), - gst_multi_fd_sink_signals[SIGNAL_CLIENT_REMOVED], 0, fd, - client->status); - g_free (client); - return; - } + if (clink != NULL) + goto duplicate; /* we can add the fd now */ clink = sink->clients = g_list_prepend (sink->clients, client); @@ -627,6 +715,37 @@ gst_multi_fd_sink_add (GstMultiFdSink * sink, int fd) g_signal_emit (G_OBJECT (sink), gst_multi_fd_sink_signals[SIGNAL_CLIENT_ADDED], 0, fd); + + return; + + /* errors */ +wrong_limits: + { + GST_WARNING_OBJECT (sink, + "[fd %5d] wrong values min =%" G_GUINT64_FORMAT ", max=%" + G_GUINT64_FORMAT ", unit %d specified when adding client", fd, + min_value, max_value, min_unit); + return; + } +duplicate: + { + client->status = GST_CLIENT_STATUS_DUPLICATE; + CLIENTS_UNLOCK (sink); + GST_WARNING_OBJECT (sink, "[fd %5d] duplicate client found, refusing", fd); + g_signal_emit (G_OBJECT (sink), + gst_multi_fd_sink_signals[SIGNAL_CLIENT_REMOVED], 0, fd, + client->status); + g_free (client); + return; + } +} + +/* "add" signal implemntation */ +void +gst_multi_fd_sink_add (GstMultiFdSink * sink, int fd) +{ + gst_multi_fd_sink_add_full (sink, fd, sink->def_sync_method, + sink->def_burst_unit, sink->def_burst_value, sink->def_burst_unit, -1); } /* "remove" signal implemntation */ @@ -739,7 +858,7 @@ gst_multi_fd_sink_get_stats (GstMultiFdSink * sink, int fd) return result; } -/* should be called with the clientslock held. +/* should be called with the clientslock helt. * Note that we don't close the fd as we didn't open it in the first * place. An application should connect to the client-removed signal and * close the fd itself. @@ -846,13 +965,8 @@ gst_multi_fd_sink_handle_client_read (GstMultiFdSink * sink, fd = client->fd.fd; - if (ioctl (fd, FIONREAD, &avail) < 0) { - GST_WARNING_OBJECT (sink, "[fd %5d] ioctl failed: %s (%d)", - fd, g_strerror (errno), errno); - client->status = GST_CLIENT_STATUS_ERROR; - ret = FALSE; - return ret; - } + if (ioctl (fd, FIONREAD, &avail) < 0) + goto ioctl_failed; GST_DEBUG_OBJECT (sink, "[fd %5d] select reports client read of %d bytes", fd, avail); @@ -900,11 +1014,21 @@ gst_multi_fd_sink_handle_client_read (GstMultiFdSink * sink, while (avail > 0); } return ret; + + /* ERRORS */ +ioctl_failed: + { + GST_WARNING_OBJECT (sink, "[fd %5d] ioctl failed: %s (%d)", + fd, g_strerror (errno), errno); + client->status = GST_CLIENT_STATUS_ERROR; + return FALSE; + } } /* Queue raw data for this client, creating a new buffer. * This takes ownership of the data by - * setting it as GST_BUFFER_MALLOCDATA() on the created buffer + * setting it as GST_BUFFER_MALLOCDATA() on the created buffer so + * be sure to pass g_free()-able @data. */ static gboolean gst_multi_fd_sink_client_queue_data (GstMultiFdSink * sink, @@ -1100,87 +1224,381 @@ gst_multi_fd_sink_client_queue_buffer (GstMultiFdSink * sink, return TRUE; } +/* find the keyframe in the list of buffers starting the + * search from @idx. @direction as -1 will search backwards, + * 1 will search forwards. + * Returns: the index or -1 if there is no keyframe after idx. + */ +static gint +find_syncframe (GstMultiFdSink * sink, gint idx, gint direction) +{ + gint i, len, result; + + /* take length of queued buffers */ + len = sink->bufqueue->len; + + /* assume we don't find a keyframe */ + result = -1; + + /* then loop over all buffers to find the first keyframe */ + for (i = idx; i >= 0 && i < len; i += direction) { + GstBuffer *buf; + + buf = g_array_index (sink->bufqueue, GstBuffer *, i); + if (is_sync_frame (sink, buf)) { + GST_LOG_OBJECT (sink, "found keyframe at %d from %d, direction %d", + i, idx, direction); + result = i; + break; + } + } + return result; +} + +#define find_next_syncframe(s,i) find_syncframe(s,i,1) +#define find_prev_syncframe(s,i) find_syncframe(s,i,-1) + +/* find the positions in the buffer queue where *_min and *_max + * is satisfied + */ +/* count the amount of data in the buffers and return the index + * that satifies the given limits. + * + * Returns: index @idx in the buffer queue so that the given limits are + * satisfied. TRUE if all the limits could be satisfied, FALSE if not + * enough data was in the queue. + * + * FIXME, this code might now work if any of the units is in buffers... + */ +static gboolean +find_limits (GstMultiFdSink * sink, + gint * min_idx, gint bytes_min, gint buffers_min, gint64 time_min, + gint * max_idx, gint bytes_max, gint buffers_max, gint64 time_max) +{ + GstClockTime first, time; + gint i, len, bytes; + gboolean result, max_hit; + + /* take length of queue */ + len = sink->bufqueue->len; + + /* this must hold */ + g_assert (len > 0); + + GST_LOG_OBJECT (sink, + "bytes_min %d, buffers_min %d, time_min %" GST_TIME_FORMAT + ", bytes_max %d, buffers_max %d, time_max %" GST_TIME_FORMAT, bytes_min, + buffers_min, GST_TIME_ARGS (time_min), bytes_max, buffers_max, + GST_TIME_ARGS (time_max)); + + /* do the trivial buffer limit test */ + if (buffers_min != -1 && len < buffers_min) { + *min_idx = len - 1; + *max_idx = len - 1; + return FALSE; + } + + result = FALSE; + /* else count bytes and time */ + first = -1; + bytes = 0; + /* unset limits */ + *min_idx = -1; + *max_idx = -1; + max_hit = FALSE; + + i = 0; + /* loop through the buffers, when a limit is ok, mark it + * as -1, we have at least one buffer in the queue. */ + do { + GstBuffer *buf; + + /* if we checked all min limits, update result */ + if (bytes_min == -1 && time_min == -1 && *min_idx == -1) { + /* don't go below 0 */ + *min_idx = MAX (i - 1, 0); + } + /* if we reached one max limit break out */ + if (max_hit) { + /* i > 0 when we get here, we subtract one to get the position + * of the previous buffer. */ + *max_idx = i - 1; + /* we have valid complete result if we found a min_idx too */ + result = *min_idx != -1; + break; + } + buf = g_array_index (sink->bufqueue, GstBuffer *, i); + + bytes += GST_BUFFER_SIZE (buf); + + /* take timestamp and save for the base first timestamp */ + if ((time = GST_BUFFER_TIMESTAMP (buf)) != -1) { + if (first == -1) + first = time; + + /* increase max usage if we did not fill enough. Note that + * buffers are sorted from new to old, so the first timestamp is + * bigger than the next one. */ + if (time_min != -1 && first - time >= time_min) + time_min = -1; + if (time_max != -1 && first - time >= time_max) + max_hit = TRUE; + } + /* time is OK or unknown, check and increase if not enough bytes */ + if (bytes_min != -1) { + if (bytes >= bytes_min) + bytes_min = -1; + } + if (bytes_max != -1) { + if (bytes >= bytes_max) { + max_hit = TRUE; + } + } + i++; + } + while (i < len); + + /* if we did not hit the max or min limit, set to buffer size */ + if (*max_idx == -1) + *max_idx = len - 1; + /* make sure min does not exceed max */ + if (*min_idx == -1) + *min_idx = *max_idx; + + return result; +} + +/* parse the unit/value pair and assign it to the result value of the + * right type, leave the other values untouched + * + * Returns: FALSE if the unit is unknown or undefined. TRUE otherwise. + */ +static gboolean +assign_value (GstUnitType unit, guint64 value, gint * bytes, gint * buffers, + GstClockTime * time) +{ + gboolean res = TRUE; + + /* set only the limit of the given format to the given value */ + switch (unit) { + case GST_UNIT_TYPE_BUFFERS: + *buffers = (gint) value; + break; + case GST_UNIT_TYPE_TIME: + *time = value; + break; + case GST_UNIT_TYPE_BYTES: + *bytes = (gint) value; + break; + case GST_UNIT_TYPE_UNDEFINED: + default: + res = FALSE; + break; + } + return res; +} + +/* count the index in the buffer queue to satisfy the given unit + * and value pair starting from buffer at index 0. + * + * Returns: TRUE if there was enuough data in the queue to satisfy the + * burst values. @idx contains the index in the buffer that contains enough + * data to satisfy the limits or the last buffer in the queue when the + * function returns FALSE. + */ +static gboolean +count_burst_unit (GstMultiFdSink * sink, gint * min_idx, GstUnitType min_unit, + guint64 min_value, gint * max_idx, GstUnitType max_unit, guint64 max_value) +{ + gint bytes_min = -1, buffers_min = -1; + gint bytes_max = -1, buffers_max = -1; + GstClockTime time_min = -1, time_max = -1; + + assign_value (min_unit, min_value, &bytes_min, &buffers_min, &time_min); + assign_value (max_unit, max_value, &bytes_max, &buffers_max, &time_max); + + return find_limits (sink, min_idx, bytes_min, buffers_min, time_min, + max_idx, bytes_max, buffers_max, time_max); +} + /* decide where in the current buffer queue this new client should start - * receiving buffers from + * receiving buffers from. + * This function is called whenever a client is connected and has not yet + * received a buffer. * If this returns -1, it means that we haven't found a good point to * start streaming from yet, and this function should be called again later - * when more buffers have arrived */ + * when more buffers have arrived. + */ static gint gst_multi_fd_sink_new_client (GstMultiFdSink * sink, GstTCPClient * client) { gint result; - switch (sink->sync_method) { + switch (client->sync_method) { + case GST_SYNC_METHOD_LATEST: + /* no syncing, we are happy with whatever the client is going to get */ + GST_LOG_OBJECT (sink, "no client sync needed"); + result = client->bufpos; + break; case GST_SYNC_METHOD_NEXT_KEYFRAME: { + GstBuffer *buf; + /* if the buffer at the head of the queue is a sync point we can proceed, * else we need to skip the buffer and wait for a new one */ GST_LOG_OBJECT (sink, "[fd %5d] new client, bufpos %d, waiting for keyframe", client->fd.fd, client->bufpos); - /* the client is not yet aligned to a buffer */ - if (client->bufpos < 0) { - result = -1; - } else { - GstBuffer *buf; - gint i; - - for (i = client->bufpos; i >= 0; i--) { - /* get the buffer for the client */ - buf = g_array_index (sink->bufqueue, GstBuffer *, i); - if (is_sync_frame (sink, buf)) { - GST_LOG_OBJECT (sink, "[fd %5d] new client, found sync", - client->fd.fd); - result = i; - goto done; - } else { - /* client is not on a buffer, need to skip this buffer and - * wait some more */ - GST_LOG_OBJECT (sink, "[fd %5d] new client, skipping buffer", - client->fd.fd); - client->bufpos--; - } - } - result = -1; + /* get the buffer for the client */ + buf = g_array_index (sink->bufqueue, GstBuffer *, 0); + if (is_sync_frame (sink, buf)) { + GST_LOG_OBJECT (sink, "[fd %5d] new client, found sync", client->fd.fd); + result = 0; + goto done; } + /* client is not on a syncbuffer, need to skip this buffer and + * wait some more */ + GST_LOG_OBJECT (sink, "[fd %5d] new client, skipping buffer", + client->fd.fd); + client->bufpos = -1; + result = -1; break; } case GST_SYNC_METHOD_LATEST_KEYFRAME: { - /* FIXME for new clients we constantly scan the complete - * buffer queue for sync point whenever a buffer is added. This is - * suboptimal because if we cannot find a sync point the first time, - * the algorithm should behave as GST_SYNC_METHOD_NEXT_KEYFRAME */ - gint i, len; - GST_LOG_OBJECT (sink, "[fd %5d] new client, bufpos %d, bursting keyframe", client->fd.fd, client->bufpos); - /* take length of queued buffers */ - len = sink->bufqueue->len; - /* assume we don't find a keyframe */ - result = -1; - /* then loop over all buffers to find the first keyframe */ - for (i = 0; i < len; i++) { - GstBuffer *buf; + /* for new clients we initially scan the complete buffer queue for + * a sync point when a buffer is added. If we don't find a keyframe, + * we need to wait for the next keyframe and so we change the client's + * sync method to GST_SYNC_METHOD_NEXT_KEYFRAME. + */ + result = find_next_syncframe (sink, 0); + if (result != -1) + goto done; - buf = g_array_index (sink->bufqueue, GstBuffer *, i); - if (is_sync_frame (sink, buf)) { - /* found a keyframe, return its position */ - GST_LOG_OBJECT (sink, "found keyframe at %d", i); - result = i; - goto done; - } - } GST_LOG_OBJECT (sink, "no keyframe found"); /* throw client to the waiting state */ client->bufpos = -1; + /* and make client sync to next keyframe */ + client->sync_method = GST_SYNC_METHOD_NEXT_KEYFRAME; + break; + } + case GST_SYNC_METHOD_BURST: + { + gboolean ok; + gint max; + + /* move to the position where we satisfy the client's burst + * parameters. If we could not satisfy the parameters because there + * is not enough data, we just send what we have (which is in result). + * We use the max value to limit the search + */ + ok = count_burst_unit (sink, &result, client->burst_min_unit, + client->burst_min_value, &max, client->burst_max_unit, + client->burst_max_value); + + GST_LOG_OBJECT (sink, "min %d, max %d", result, max); + + /* we hit the max and it is below the min, use that then */ + if (max != -1 && max <= result) { + result = MAX (max - 1, 0); + } + break; + } + case GST_SYNC_METHOD_BURST_KEYFRAME: + { + gboolean ok; + gint min_idx, max_idx; + gint next_syncframe, prev_syncframe; + + /* BURST_KEYFRAME: + * + * _always_ start sending a keyframe to the client. We first search + * a keyframe between min/max limits. If there is none, we send it the + * last keyframe before min. If there is none, the behaviour is like + * NEXT_KEYFRAME. + */ + /* gather burst limits */ + ok = count_burst_unit (sink, &min_idx, client->burst_min_unit, + client->burst_min_value, &max_idx, client->burst_max_unit, + client->burst_max_value); + + GST_LOG_OBJECT (sink, "min %d, max %d", min_idx, max_idx); + + /* first find a keyframe after min_idx */ + next_syncframe = find_next_syncframe (sink, min_idx); + if (next_syncframe != -1 && next_syncframe < max_idx) { + /* we have a valid keyframe and it's below the max */ + GST_LOG_OBJECT (sink, "found keyframe in min/max limits"); + result = next_syncframe; + break; + } + + /* no valid keyframe, try to find one below min */ + prev_syncframe = find_prev_syncframe (sink, min_idx); + if (prev_syncframe != -1) { + GST_WARNING_OBJECT (sink, + "using keyframe below min in BURST_KEYFRAME sync mode"); + result = prev_syncframe; + break; + } + + /* no prev keyframe or not enough data */ + GST_WARNING_OBJECT (sink, + "no prev keyframe found in BURST_KEYFRAME sync mode, waiting for next"); + + /* throw client to the waiting state */ + client->bufpos = -1; + /* and make client sync to next keyframe */ + client->sync_method = GST_SYNC_METHOD_NEXT_KEYFRAME; + result = -1; + break; + } + case GST_SYNC_METHOD_BURST_WITH_KEYFRAME: + { + gboolean ok; + gint min_idx, max_idx; + gint next_syncframe; + + /* BURST_WITH_KEYFRAME: + * + * try to start sending a keyframe to the client. We first search + * a keyframe between min/max limits. If there is none, we send it the + * amount of data up 'till min. + */ + /* gather enough data to burst */ + ok = count_burst_unit (sink, &min_idx, client->burst_min_unit, + client->burst_min_value, &max_idx, client->burst_max_unit, + client->burst_max_value); + + GST_LOG_OBJECT (sink, "min %d, max %d", min_idx, max_idx); + + /* first find a keyframe after min_idx */ + next_syncframe = find_next_syncframe (sink, min_idx); + if (next_syncframe != -1 && next_syncframe < max_idx) { + /* we have a valid keyframe and it's below the max */ + GST_LOG_OBJECT (sink, "found keyframe in min/max limits"); + result = next_syncframe; + break; + } + + /* no keyframe, send data from min_idx */ + GST_WARNING_OBJECT (sink, "using min in BURST_WITH_KEYFRAME sync mode"); + + /* make sure we don't go over the max limit */ + if (max_idx != -1 && max_idx <= min_idx) { + result = MAX (max_idx - 1, 0); + } else { + result = min_idx; + } + break; } default: - /* no syncing, we are happy with whatever the client is going to get */ - GST_LOG_OBJECT (sink, "no client syn needed"); + g_warning ("unknown sync method %d", client->sync_method); result = client->bufpos; break; } @@ -1332,16 +1750,9 @@ gst_multi_fd_sink_handle_client_write (GstMultiFdSink * sink, /* nothing serious, resource was unavailable, try again later */ more = FALSE; } else if (errno == ECONNRESET) { - GST_DEBUG_OBJECT (sink, "[fd %5d] connection reset by peer, removing", - fd); - client->status = GST_CLIENT_STATUS_CLOSED; - return FALSE; + goto connection_reset; } else { - GST_WARNING_OBJECT (sink, - "[fd %5d] could not write, removing client: %s (%d)", fd, - g_strerror (errno), errno); - client->status = GST_CLIENT_STATUS_ERROR; - return FALSE; + goto write_error; } } else { if (wrote < maxsize) { @@ -1366,6 +1777,22 @@ gst_multi_fd_sink_handle_client_write (GstMultiFdSink * sink, } while (more); return TRUE; + + /* ERRORS */ +connection_reset: + { + GST_DEBUG_OBJECT (sink, "[fd %5d] connection reset by peer, removing", fd); + client->status = GST_CLIENT_STATUS_CLOSED; + return FALSE; + } +write_error: + { + GST_WARNING_OBJECT (sink, + "[fd %5d] could not write, removing client: %s (%d)", fd, + g_strerror (errno), errno); + client->status = GST_CLIENT_STATUS_ERROR; + return FALSE; + } } /* calculate the new position for a client after recovery. This function @@ -1509,10 +1936,29 @@ gst_multi_fd_sink_queue_buffer (GstMultiFdSink * sink, GstBuffer * buf) } } + /* make sure we respect bytes-min, buffers-min and time-min when they are set */ + { + gint usage, max; + + GST_LOG_OBJECT (sink, + "extending queue %d to respect time_min %" GST_TIME_FORMAT + ", bytes_min %d, buffers_min %d", max_buffer_usage, + GST_TIME_ARGS (sink->time_min), sink->bytes_min, sink->buffers_min); + + /* get index where the limits are ok, we don't really care if all limits + * are ok, we just queue as much as we need. We also don't compare against + * the max limits. */ + find_limits (sink, &usage, sink->bytes_min, sink->buffers_min, + sink->time_min, &max, -1, -1, -1); + + max_buffer_usage = MAX (max_buffer_usage, usage + 1); + GST_LOG_OBJECT (sink, "extended queue to %d", max_buffer_usage); + } + /* now look for sync points and make sure there is at least one - * sync point in the queue. We only do this if the burst mode - * is enabled. */ - if (sink->sync_method == GST_SYNC_METHOD_LATEST_KEYFRAME) { + * sync point in the queue. We only do this if the LATEST_KEYFRAME + * mode is selected */ + if (sink->def_sync_method == GST_SYNC_METHOD_LATEST_KEYFRAME) { /* no point in searching beyond the queue length */ gint limit = queuelen; GstBuffer *buf; @@ -1535,6 +1981,8 @@ gst_multi_fd_sink_queue_buffer (GstMultiFdSink * sink, GstBuffer * buf) GST_LOG_OBJECT (sink, "max buffer usage is now %d", max_buffer_usage); } + GST_LOG_OBJECT (sink, "len %d, usage %d", queuelen, max_buffer_usage); + /* nobody is referencing units after max_buffer_usage so we can * remove them from the queue. We remove them in reverse order as * this is the most optimal for GArray. */ @@ -1737,6 +2185,7 @@ static GstFlowReturn gst_multi_fd_sink_render (GstBaseSink * bsink, GstBuffer * buf) { GstMultiFdSink *sink; + gboolean in_caps; GstCaps *bufcaps, *padcaps; sink = GST_MULTI_FD_SINK (bsink); @@ -1769,12 +2218,14 @@ gst_multi_fd_sink_render (GstBaseSink * bsink, GstBuffer * buf) g_return_val_if_fail (GST_OBJECT_FLAG_IS_SET (sink, GST_MULTI_FD_SINK_OPEN), GST_FLOW_ERROR); - GST_LOG_OBJECT (sink, "received buffer %p", buf); + in_caps = GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_IN_CAPS); + + GST_LOG_OBJECT (sink, "received buffer %p, in_caps: %d", buf, in_caps); + /* if we get IN_CAPS buffers, but the previous buffer was not IN_CAPS, * it means we're getting new streamheader buffers, and we should clear * the old ones */ - if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_IN_CAPS) && - sink->previous_buffer_in_caps == FALSE) { + if (in_caps && sink->previous_buffer_in_caps == FALSE) { GST_DEBUG_OBJECT (sink, "receiving new IN_CAPS buffers, clearing old streamheader"); g_slist_foreach (sink->streamheader, (GFunc) gst_mini_object_unref, NULL); @@ -1782,6 +2233,9 @@ gst_multi_fd_sink_render (GstBaseSink * bsink, GstBuffer * buf) sink->streamheader = NULL; } + /* save the current in_caps */ + sink->previous_buffer_in_caps = in_caps; + /* if the incoming buffer is marked as IN CAPS, then we assume for now * it's a streamheader that needs to be sent to each new client, so we * put it on our internal list of streamheader buffers. @@ -1790,21 +2244,18 @@ gst_multi_fd_sink_render (GstBaseSink * bsink, GstBuffer * buf) * * We don't send the buffer to the client, since streamheaders are sent * separately when necessary. */ - if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_IN_CAPS)) { - sink->previous_buffer_in_caps = TRUE; + if (in_caps) { GST_DEBUG_OBJECT (sink, "appending IN_CAPS buffer with length %d to streamheader", GST_BUFFER_SIZE (buf)); sink->streamheader = g_slist_append (sink->streamheader, buf); - return GST_FLOW_OK; + } else { + /* queue the buffer, this is a regular data buffer. */ + gst_multi_fd_sink_queue_buffer (sink, buf); + + sink->bytes_to_serve += GST_BUFFER_SIZE (buf); } - sink->previous_buffer_in_caps = FALSE; - /* queue the buffer */ - gst_multi_fd_sink_queue_buffer (sink, buf); - - sink->bytes_to_serve += GST_BUFFER_SIZE (buf); - return GST_FLOW_OK; } @@ -1814,39 +2265,53 @@ gst_multi_fd_sink_set_property (GObject * object, guint prop_id, { GstMultiFdSink *multifdsink; - g_return_if_fail (GST_IS_MULTI_FD_SINK (object)); multifdsink = GST_MULTI_FD_SINK (object); switch (prop_id) { - case ARG_PROTOCOL: + case PROP_PROTOCOL: multifdsink->protocol = g_value_get_enum (value); break; - case ARG_MODE: + case PROP_MODE: multifdsink->mode = g_value_get_enum (value); break; - case ARG_BUFFERS_MAX: + case PROP_BUFFERS_MAX: multifdsink->units_max = g_value_get_int (value); break; - case ARG_BUFFERS_SOFT_MAX: + case PROP_BUFFERS_SOFT_MAX: multifdsink->units_soft_max = g_value_get_int (value); break; - case ARG_UNIT_TYPE: + case PROP_TIME_MIN: + multifdsink->time_min = g_value_get_int64 (value); + break; + case PROP_BYTES_MIN: + multifdsink->bytes_min = g_value_get_int (value); + break; + case PROP_BUFFERS_MIN: + multifdsink->buffers_min = g_value_get_int (value); + break; + case PROP_UNIT_TYPE: multifdsink->unit_type = g_value_get_enum (value); break; - case ARG_UNITS_MAX: + case PROP_UNITS_MAX: multifdsink->units_max = g_value_get_int (value); break; - case ARG_UNITS_SOFT_MAX: + case PROP_UNITS_SOFT_MAX: multifdsink->units_soft_max = g_value_get_int (value); break; - case ARG_RECOVER_POLICY: + case PROP_RECOVER_POLICY: multifdsink->recover_policy = g_value_get_enum (value); break; - case ARG_TIMEOUT: + case PROP_TIMEOUT: multifdsink->timeout = g_value_get_uint64 (value); break; - case ARG_SYNC_METHOD: - multifdsink->sync_method = g_value_get_enum (value); + case PROP_SYNC_METHOD: + multifdsink->def_sync_method = g_value_get_enum (value); + break; + case PROP_BURST_UNIT: + multifdsink->def_burst_unit = g_value_get_enum (value); + break; + case PROP_BURST_VALUE: + multifdsink->def_burst_value = g_value_get_uint64 (value); break; default: @@ -1861,55 +2326,69 @@ gst_multi_fd_sink_get_property (GObject * object, guint prop_id, GValue * value, { GstMultiFdSink *multifdsink; - g_return_if_fail (GST_IS_MULTI_FD_SINK (object)); multifdsink = GST_MULTI_FD_SINK (object); switch (prop_id) { - case ARG_PROTOCOL: + case PROP_PROTOCOL: g_value_set_enum (value, multifdsink->protocol); break; - case ARG_MODE: + case PROP_MODE: g_value_set_enum (value, multifdsink->mode); break; - case ARG_BUFFERS_MAX: + case PROP_BUFFERS_MAX: g_value_set_int (value, multifdsink->units_max); break; - case ARG_BUFFERS_SOFT_MAX: + case PROP_BUFFERS_SOFT_MAX: g_value_set_int (value, multifdsink->units_soft_max); break; - case ARG_BUFFERS_QUEUED: + case PROP_TIME_MIN: + g_value_set_int64 (value, multifdsink->time_min); + break; + case PROP_BYTES_MIN: + g_value_set_int (value, multifdsink->bytes_min); + break; + case PROP_BUFFERS_MIN: + g_value_set_int (value, multifdsink->buffers_min); + break; + case PROP_BUFFERS_QUEUED: g_value_set_uint (value, multifdsink->buffers_queued); break; - case ARG_BYTES_QUEUED: + case PROP_BYTES_QUEUED: g_value_set_uint (value, multifdsink->bytes_queued); break; - case ARG_TIME_QUEUED: + case PROP_TIME_QUEUED: g_value_set_uint64 (value, multifdsink->time_queued); break; - case ARG_UNIT_TYPE: + case PROP_UNIT_TYPE: g_value_set_enum (value, multifdsink->unit_type); break; - case ARG_UNITS_MAX: + case PROP_UNITS_MAX: g_value_set_int (value, multifdsink->units_max); break; - case ARG_UNITS_SOFT_MAX: + case PROP_UNITS_SOFT_MAX: g_value_set_int (value, multifdsink->units_soft_max); break; - case ARG_RECOVER_POLICY: + case PROP_RECOVER_POLICY: g_value_set_enum (value, multifdsink->recover_policy); break; - case ARG_TIMEOUT: + case PROP_TIMEOUT: g_value_set_uint64 (value, multifdsink->timeout); break; - case ARG_SYNC_METHOD: - g_value_set_enum (value, multifdsink->sync_method); + case PROP_SYNC_METHOD: + g_value_set_enum (value, multifdsink->def_sync_method); break; - case ARG_BYTES_TO_SERVE: + case PROP_BYTES_TO_SERVE: g_value_set_uint64 (value, multifdsink->bytes_to_serve); break; - case ARG_BYTES_SERVED: + case PROP_BYTES_SERVED: g_value_set_uint64 (value, multifdsink->bytes_served); break; + case PROP_BURST_UNIT: + g_value_set_enum (value, multifdsink->def_burst_unit); + break; + case PROP_BURST_VALUE: + g_value_set_uint64 (value, multifdsink->def_burst_value); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); @@ -2086,6 +2565,7 @@ gst_multi_fd_sink_change_state (GstElement * element, GstStateChange transition) /* ERRORS */ start_failed: { + /* error message was posted */ return GST_STATE_CHANGE_FAILURE; } } diff --git a/gst/tcp/gstmultifdsink.h b/gst/tcp/gstmultifdsink.h index 7e615736f6..7533424597 100644 --- a/gst/tcp/gstmultifdsink.h +++ b/gst/tcp/gstmultifdsink.h @@ -73,9 +73,15 @@ typedef enum /** * GstSyncMethod: - * @GST_SYNC_METHOD_LATEST : client receives most recent buffer - * @GST_SYNC_METHOD_NEXT_KEYFRAME : client receives next keyframe - * @GST_SYNC_METHOD_LATEST_KEYFRAME: client receives latest keyframe (burst) + * @GST_SYNC_METHOD_LATEST : client receives most recent buffer + * @GST_SYNC_METHOD_NEXT_KEYFRAME : client receives next keyframe + * @GST_SYNC_METHOD_LATEST_KEYFRAME : client receives latest keyframe (burst) + * @GST_SYNC_METHOD_BURST : client receives specific amount of data + * @GST_SYNC_METHOD_BURST_KEYFRAME : client receives specific amount of data + * starting from latest keyframe + * @GST_SYNC_METHOD_BURST_WITH_KEYFRAME : client receives specific amount of data from + * a keyframe, or if there is not enough data after + * the keyframe, starting before the keyframe * * This enum defines the selection of the first buffer that is sent * to a new client. @@ -85,18 +91,23 @@ typedef enum GST_SYNC_METHOD_LATEST, GST_SYNC_METHOD_NEXT_KEYFRAME, GST_SYNC_METHOD_LATEST_KEYFRAME, + GST_SYNC_METHOD_BURST, + GST_SYNC_METHOD_BURST_KEYFRAME, + GST_SYNC_METHOD_BURST_WITH_KEYFRAME, } GstSyncMethod; /** * GstUnitType: - * @GST_UNIT_TYPE_BUFFERS: a buffer - * @GST_UNIT_TYPE_TIME : timeunits (in nanoseconds) - * @GST_UNIT_TYPE_BYTES : bytes + * @GST_UNIT_TYPE_UNDEFINED: undefined + * @GST_UNIT_TYPE_BUFFERS : buffers + * @GST_UNIT_TYPE_TIME : timeunits (in nanoseconds) + * @GST_UNIT_TYPE_BYTES : bytes * * The units used to specify limits. */ typedef enum { + GST_UNIT_TYPE_UNDEFINED, GST_UNIT_TYPE_BUFFERS, GST_UNIT_TYPE_TIME, GST_UNIT_TYPE_BYTES, @@ -144,6 +155,13 @@ typedef struct { gboolean caps_sent; gboolean new_connection; + /* method to sync client when connecting */ + GstSyncMethod sync_method; + GstUnitType burst_min_unit; + guint64 burst_min_value; + GstUnitType burst_max_unit; + guint64 burst_max_value; + GstCaps *caps; /* caps of last queued buffer */ /* stats */ @@ -192,12 +210,24 @@ struct _GstMultiFdSink { gboolean running; /* the thread state */ GThread *thread; /* the sender thread */ + /* these values are used to check if a client is reading fast + * enough and to control receovery */ GstUnitType unit_type;/* the type of the units */ - gint units_max; /* max units to queue */ + gint units_max; /* max units to queue for a client */ gint units_soft_max; /* max units a client can lag before recovery starts */ GstRecoverPolicy recover_policy; GstClockTime timeout; /* max amount of nanoseconds to remain idle */ - GstSyncMethod sync_method; /* what method to use for connecting clients */ + + GstSyncMethod def_sync_method; /* what method to use for connecting clients */ + GstUnitType def_burst_unit; + guint64 def_burst_value; + + /* these values are used to control the amount of data + * kept in the queues. It allows clients to perform a burst + * on connect. */ + gint bytes_min; /* min number of bytes to queue */ + gint64 time_min; /* min time to queue */ + gint buffers_min; /* min number of buffers to queue */ /* stats */ gint buffers_queued; /* number of queued buffers */ @@ -212,6 +242,9 @@ struct _GstMultiFdSinkClass { /* element methods */ void (*add) (GstMultiFdSink *sink, int fd); + void (*add_full) (GstMultiFdSink *sink, int fd, GstSyncMethod sync, + GstUnitType format, guint64 value, + GstUnitType max_unit, guint64 max_value); void (*remove) (GstMultiFdSink *sink, int fd); void (*clear) (GstMultiFdSink *sink); GValueArray* (*get_stats) (GstMultiFdSink *sink, int fd); @@ -231,10 +264,14 @@ struct _GstMultiFdSinkClass { GType gst_multi_fd_sink_get_type (void); void gst_multi_fd_sink_add (GstMultiFdSink *sink, int fd); +void gst_multi_fd_sink_add_full (GstMultiFdSink *sink, int fd, GstSyncMethod sync, + GstUnitType min_unit, guint64 min_value, + GstUnitType max_unit, guint64 max_value); void gst_multi_fd_sink_remove (GstMultiFdSink *sink, int fd); void gst_multi_fd_sink_clear (GstMultiFdSink *sink); GValueArray* gst_multi_fd_sink_get_stats (GstMultiFdSink *sink, int fd); + G_END_DECLS #endif /* __GST_MULTI_FD_SINK_H__ */ diff --git a/gst/tcp/gsttcp-marshal.list b/gst/tcp/gsttcp-marshal.list index 49f4e527a9..3a7fa9b294 100644 --- a/gst/tcp/gsttcp-marshal.list +++ b/gst/tcp/gsttcp-marshal.list @@ -1,4 +1,5 @@ VOID:STRING,UINT VOID:INT VOID:INT,BOXED +VOID:INT,BOOLEAN,INT,UINT64,INT,UINT64 BOXED:INT diff --git a/tests/check/elements/multifdsink.c b/tests/check/elements/multifdsink.c index 2cc8e7fd0c..024b27687d 100644 --- a/tests/check/elements/multifdsink.c +++ b/tests/check/elements/multifdsink.c @@ -125,6 +125,7 @@ GST_START_TEST (test_add_client) memcpy (GST_BUFFER_DATA (buffer), "dead", 4); fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK); + GST_DEBUG ("reading"); fail_if (read (pfd[0], data, 4) < 4); fail_unless (strncmp (data, "dead", 4) == 0); wait_bytes_served (sink, 4); @@ -441,6 +442,358 @@ GST_START_TEST (test_change_streamheader) GST_END_TEST; +/* keep 100 bytes and burst 80 bytes to clients */ +GST_START_TEST (test_burst_client_bytes) +{ + GstElement *sink; + GstBuffer *buffer; + GstCaps *caps; + int pfd1[2]; + int pfd2[2]; + int pfd3[2]; + gchar data[16]; + guint64 bytes_served; + gint i; + guint buffers_queued; + + sink = setup_multifdsink (); + /* make sure we keep at least 100 bytes at all times */ + g_object_set (sink, "bytes-min", 100, NULL); + g_object_set (sink, "sync-method", 3, NULL); /* 3 = burst */ + g_object_set (sink, "burst-unit", 3, NULL); /* 3 = bytes */ + g_object_set (sink, "burst-value", (guint64) 80, NULL); + + fail_if (pipe (pfd1) == -1); + fail_if (pipe (pfd2) == -1); + fail_if (pipe (pfd3) == -1); + + ASSERT_SET_STATE (sink, GST_STATE_PLAYING, GST_STATE_CHANGE_ASYNC); + + caps = gst_caps_from_string ("application/x-gst-check"); + GST_DEBUG ("Created test caps %p %" GST_PTR_FORMAT, caps, caps); + + /* push buffers in, 9 * 16 bytes = 144 bytes */ + for (i = 0; i < 9; i++) { + gchar *data; + + buffer = gst_buffer_new_and_alloc (16); + gst_buffer_set_caps (buffer, caps); + + /* copy some id */ + data = (gchar *) GST_BUFFER_DATA (buffer); + g_snprintf (data, 16, "deadbee%08x", i); + + fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK); + } + + /* check that at least 7 buffers (112 bytes) are in the queue */ + g_object_get (sink, "buffers-queued", &buffers_queued, NULL); + fail_if (buffers_queued != 7); + + /* now add the clients */ + g_signal_emit_by_name (sink, "add", pfd1[1]); + g_signal_emit_by_name (sink, "add_full", pfd2[1], 3, + 3, (guint64) 50, 3, (guint64) 200); + g_signal_emit_by_name (sink, "add_full", pfd3[1], 3, + 3, (guint64) 50, 3, (guint64) 50); + + /* push last buffer to make client fds ready for reading */ + for (i = 9; i < 10; i++) { + gchar *data; + + buffer = gst_buffer_new_and_alloc (16); + gst_buffer_set_caps (buffer, caps); + + /* copy some id */ + data = (gchar *) GST_BUFFER_DATA (buffer); + g_snprintf (data, 16, "deadbee%08x", i); + + fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK); + } + + /* now we should only read the last 5 buffers (5 * 16 = 80 bytes) */ + GST_DEBUG ("Reading from client 1"); + fail_if (read (pfd1[0], data, 16) < 16); + fail_unless (strncmp (data, "deadbee00000005", 16) == 0); + fail_if (read (pfd1[0], data, 16) < 16); + fail_unless (strncmp (data, "deadbee00000006", 16) == 0); + fail_if (read (pfd1[0], data, 16) < 16); + fail_unless (strncmp (data, "deadbee00000007", 16) == 0); + fail_if (read (pfd1[0], data, 16) < 16); + fail_unless (strncmp (data, "deadbee00000008", 16) == 0); + fail_if (read (pfd1[0], data, 16) < 16); + fail_unless (strncmp (data, "deadbee00000009", 16) == 0); + + /* second client only bursts 50 bytes = 4 buffers (we get 4 buffers since + * the max alows it) */ + GST_DEBUG ("Reading from client 2"); + fail_if (read (pfd2[0], data, 16) < 16); + fail_unless (strncmp (data, "deadbee00000006", 16) == 0); + fail_if (read (pfd2[0], data, 16) < 16); + fail_unless (strncmp (data, "deadbee00000007", 16) == 0); + fail_if (read (pfd2[0], data, 16) < 16); + fail_unless (strncmp (data, "deadbee00000008", 16) == 0); + fail_if (read (pfd2[0], data, 16) < 16); + fail_unless (strncmp (data, "deadbee00000009", 16) == 0); + + /* third client only bursts 50 bytes = 4 buffers, we can't send + * more than 50 bytes so we only get 3 buffers (48 bytes). */ + GST_DEBUG ("Reading from client 3"); + fail_if (read (pfd3[0], data, 16) < 16); + fail_unless (strncmp (data, "deadbee00000007", 16) == 0); + fail_if (read (pfd3[0], data, 16) < 16); + fail_unless (strncmp (data, "deadbee00000008", 16) == 0); + fail_if (read (pfd3[0], data, 16) < 16); + fail_unless (strncmp (data, "deadbee00000009", 16) == 0); + + GST_DEBUG ("cleaning up multifdsink"); + ASSERT_SET_STATE (sink, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS); + cleanup_multifdsink (sink); + + ASSERT_CAPS_REFCOUNT (caps, "caps", 1); + gst_caps_unref (caps); +} + +GST_END_TEST; + +/* keep 100 bytes and burst 80 bytes to clients */ +GST_START_TEST (test_burst_client_bytes_keyframe) +{ + GstElement *sink; + GstBuffer *buffer; + GstCaps *caps; + int pfd1[2]; + int pfd2[2]; + int pfd3[2]; + gchar data[16]; + guint64 bytes_served; + gint i; + guint buffers_queued; + + sink = setup_multifdsink (); + /* make sure we keep at least 100 bytes at all times */ + g_object_set (sink, "bytes-min", 100, NULL); + g_object_set (sink, "sync-method", 4, NULL); /* 3 = burst_keyframe */ + g_object_set (sink, "burst-unit", 3, NULL); /* 3 = bytes */ + g_object_set (sink, "burst-value", (guint64) 80, NULL); + + fail_if (pipe (pfd1) == -1); + fail_if (pipe (pfd2) == -1); + fail_if (pipe (pfd3) == -1); + + ASSERT_SET_STATE (sink, GST_STATE_PLAYING, GST_STATE_CHANGE_ASYNC); + + caps = gst_caps_from_string ("application/x-gst-check"); + GST_DEBUG ("Created test caps %p %" GST_PTR_FORMAT, caps, caps); + + /* push buffers in, 9 * 16 bytes = 144 bytes */ + for (i = 0; i < 9; i++) { + gchar *data; + + buffer = gst_buffer_new_and_alloc (16); + gst_buffer_set_caps (buffer, caps); + + /* mark most buffers as delta */ + if (i != 0 && i != 4 && i != 8) + GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DELTA_UNIT); + + /* copy some id */ + data = (gchar *) GST_BUFFER_DATA (buffer); + g_snprintf (data, 16, "deadbee%08x", i); + + fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK); + } + + /* check that at least 7 buffers (112 bytes) are in the queue */ + g_object_get (sink, "buffers-queued", &buffers_queued, NULL); + fail_if (buffers_queued != 7); + + /* now add the clients */ + g_signal_emit_by_name (sink, "add", pfd1[1]); + g_signal_emit_by_name (sink, "add_full", pfd2[1], 4, + 3, (guint64) 50, 3, (guint64) 90); + g_signal_emit_by_name (sink, "add_full", pfd3[1], 4, + 3, (guint64) 50, 3, (guint64) 50); + + /* push last buffer to make client fds ready for reading */ + for (i = 9; i < 10; i++) { + gchar *data; + + buffer = gst_buffer_new_and_alloc (16); + gst_buffer_set_caps (buffer, caps); + GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DELTA_UNIT); + + /* copy some id */ + data = (gchar *) GST_BUFFER_DATA (buffer); + g_snprintf (data, 16, "deadbee%08x", i); + + fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK); + } + + /* now we should only read the last 6 buffers (min 5 * 16 = 80 bytes), + * keyframe at buffer 4 */ + GST_DEBUG ("Reading from client 1"); + fail_if (read (pfd1[0], data, 16) < 16); + fail_unless (strncmp (data, "deadbee00000004", 16) == 0); + fail_if (read (pfd1[0], data, 16) < 16); + fail_unless (strncmp (data, "deadbee00000005", 16) == 0); + fail_if (read (pfd1[0], data, 16) < 16); + fail_unless (strncmp (data, "deadbee00000006", 16) == 0); + fail_if (read (pfd1[0], data, 16) < 16); + fail_unless (strncmp (data, "deadbee00000007", 16) == 0); + fail_if (read (pfd1[0], data, 16) < 16); + fail_unless (strncmp (data, "deadbee00000008", 16) == 0); + fail_if (read (pfd1[0], data, 16) < 16); + fail_unless (strncmp (data, "deadbee00000009", 16) == 0); + + /* second client only bursts 50 bytes = 4 buffers, there is + * no keyframe above min and below max, so get one below min */ + GST_DEBUG ("Reading from client 2"); + fail_if (read (pfd2[0], data, 16) < 16); + fail_unless (strncmp (data, "deadbee00000008", 16) == 0); + fail_if (read (pfd2[0], data, 16) < 16); + fail_unless (strncmp (data, "deadbee00000009", 16) == 0); + + /* third client only bursts 50 bytes = 4 buffers, we can't send + * more than 50 bytes so we only get 2 buffers (32 bytes). */ + GST_DEBUG ("Reading from client 3"); + fail_if (read (pfd3[0], data, 16) < 16); + fail_unless (strncmp (data, "deadbee00000008", 16) == 0); + fail_if (read (pfd3[0], data, 16) < 16); + fail_unless (strncmp (data, "deadbee00000009", 16) == 0); + + GST_DEBUG ("cleaning up multifdsink"); + ASSERT_SET_STATE (sink, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS); + cleanup_multifdsink (sink); + + ASSERT_CAPS_REFCOUNT (caps, "caps", 1); + gst_caps_unref (caps); +} + +GST_END_TEST; + +/* keep 100 bytes and burst 80 bytes to clients */ +GST_START_TEST (test_burst_client_bytes_with_keyframe) +{ + GstElement *sink; + GstBuffer *buffer; + GstCaps *caps; + int pfd1[2]; + int pfd2[2]; + int pfd3[2]; + gchar data[16]; + guint64 bytes_served; + gint i; + guint buffers_queued; + + sink = setup_multifdsink (); + /* make sure we keep at least 100 bytes at all times */ + g_object_set (sink, "bytes-min", 100, NULL); + g_object_set (sink, "sync-method", 5, NULL); /* 3 = burst_with_keyframe */ + g_object_set (sink, "burst-unit", 3, NULL); /* 3 = bytes */ + g_object_set (sink, "burst-value", (guint64) 80, NULL); + + fail_if (pipe (pfd1) == -1); + fail_if (pipe (pfd2) == -1); + fail_if (pipe (pfd3) == -1); + + ASSERT_SET_STATE (sink, GST_STATE_PLAYING, GST_STATE_CHANGE_ASYNC); + + caps = gst_caps_from_string ("application/x-gst-check"); + GST_DEBUG ("Created test caps %p %" GST_PTR_FORMAT, caps, caps); + + /* push buffers in, 9 * 16 bytes = 144 bytes */ + for (i = 0; i < 9; i++) { + gchar *data; + + buffer = gst_buffer_new_and_alloc (16); + gst_buffer_set_caps (buffer, caps); + + /* mark most buffers as delta */ + if (i != 0 && i != 4 && i != 8) + GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DELTA_UNIT); + + /* copy some id */ + data = (gchar *) GST_BUFFER_DATA (buffer); + g_snprintf (data, 16, "deadbee%08x", i); + + fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK); + } + + /* check that at least 7 buffers (112 bytes) are in the queue */ + g_object_get (sink, "buffers-queued", &buffers_queued, NULL); + fail_if (buffers_queued != 7); + + /* now add the clients */ + g_signal_emit_by_name (sink, "add", pfd1[1]); + g_signal_emit_by_name (sink, "add_full", pfd2[1], 5, + 3, (guint64) 50, 3, (guint64) 90); + g_signal_emit_by_name (sink, "add_full", pfd3[1], 5, + 3, (guint64) 50, 3, (guint64) 50); + + /* push last buffer to make client fds ready for reading */ + for (i = 9; i < 10; i++) { + gchar *data; + + buffer = gst_buffer_new_and_alloc (16); + gst_buffer_set_caps (buffer, caps); + GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DELTA_UNIT); + + /* copy some id */ + data = (gchar *) GST_BUFFER_DATA (buffer); + g_snprintf (data, 16, "deadbee%08x", i); + + fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK); + } + + /* now we should only read the last 6 buffers (min 5 * 16 = 80 bytes), + * keyframe at buffer 4 */ + GST_DEBUG ("Reading from client 1"); + fail_if (read (pfd1[0], data, 16) < 16); + fail_unless (strncmp (data, "deadbee00000004", 16) == 0); + fail_if (read (pfd1[0], data, 16) < 16); + fail_unless (strncmp (data, "deadbee00000005", 16) == 0); + fail_if (read (pfd1[0], data, 16) < 16); + fail_unless (strncmp (data, "deadbee00000006", 16) == 0); + fail_if (read (pfd1[0], data, 16) < 16); + fail_unless (strncmp (data, "deadbee00000007", 16) == 0); + fail_if (read (pfd1[0], data, 16) < 16); + fail_unless (strncmp (data, "deadbee00000008", 16) == 0); + fail_if (read (pfd1[0], data, 16) < 16); + fail_unless (strncmp (data, "deadbee00000009", 16) == 0); + + /* second client only bursts 50 bytes = 4 buffers, there is + * no keyframe above min and below max, so send min */ + GST_DEBUG ("Reading from client 2"); + fail_if (read (pfd2[0], data, 16) < 16); + fail_unless (strncmp (data, "deadbee00000006", 16) == 0); + fail_if (read (pfd2[0], data, 16) < 16); + fail_unless (strncmp (data, "deadbee00000007", 16) == 0); + fail_if (read (pfd2[0], data, 16) < 16); + fail_unless (strncmp (data, "deadbee00000008", 16) == 0); + fail_if (read (pfd2[0], data, 16) < 16); + fail_unless (strncmp (data, "deadbee00000009", 16) == 0); + + /* third client only bursts 50 bytes = 4 buffers, we can't send + * more than 50 bytes so we only get 3 buffers (48 bytes). */ + GST_DEBUG ("Reading from client 3"); + fail_if (read (pfd3[0], data, 16) < 16); + fail_unless (strncmp (data, "deadbee00000007", 16) == 0); + fail_if (read (pfd3[0], data, 16) < 16); + fail_unless (strncmp (data, "deadbee00000008", 16) == 0); + fail_if (read (pfd3[0], data, 16) < 16); + fail_unless (strncmp (data, "deadbee00000009", 16) == 0); + + GST_DEBUG ("cleaning up multifdsink"); + ASSERT_SET_STATE (sink, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS); + cleanup_multifdsink (sink); + + ASSERT_CAPS_REFCOUNT (caps, "caps", 1); + gst_caps_unref (caps); +} + +GST_END_TEST; + /* FIXME: add test simulating chained oggs where: * sync-method is burst-on-connect * (when multifdsink actually does burst-on-connect based on byte size, not @@ -448,7 +801,6 @@ GST_END_TEST; * an old client still needs to read from before the new streamheaders * a new client gets the new streamheaders */ - Suite * multifdsink_suite (void) { @@ -460,6 +812,9 @@ multifdsink_suite (void) tcase_add_test (tc_chain, test_add_client); tcase_add_test (tc_chain, test_streamheader); tcase_add_test (tc_chain, test_change_streamheader); + tcase_add_test (tc_chain, test_burst_client_bytes); + tcase_add_test (tc_chain, test_burst_client_bytes_keyframe); + tcase_add_test (tc_chain, test_burst_client_bytes_with_keyframe); return s; }