multihandlesink: first stab at common base class

This commit is contained in:
Thomas Vander Stichele 2012-01-26 10:49:37 +01:00
parent 684aa4baaf
commit c024274261
7 changed files with 637 additions and 955 deletions

View file

@ -17,6 +17,7 @@ libgsttcp_la_SOURCES = \
gsttcpplugin.c \
gsttcpclientsrc.c gsttcpclientsink.c \
gstmultifdsink.c \
gstmultihandlesink.c \
gstmultisocketsink.c \
gsttcpserversrc.c gsttcpserversink.c

File diff suppressed because it is too large Load diff

View file

@ -46,12 +46,6 @@ G_BEGIN_DECLS
typedef struct _GstMultiFdSink GstMultiFdSink;
typedef struct _GstMultiFdSinkClass GstMultiFdSinkClass;
typedef enum {
GST_MULTI_FD_SINK_OPEN = (GST_ELEMENT_FLAG_LAST << 0),
GST_MULTI_FD_SINK_FLAG_LAST = (GST_ELEMENT_FLAG_LAST << 2)
} GstMultiFdSinkFlags;
/**
* GstTCPUnitType:
@ -73,24 +67,13 @@ typedef enum
/* structure for a client
*/
typedef struct {
GstMultiHandleClient client;
GstPollFD fd;
gint bufpos; /* position of this client in the global queue */
gint flushcount; /* the remaining number of buffers to flush out or -1 if the
client is not flushing. */
GstClientStatus status;
gboolean is_socket;
GSList *sending; /* the buffers we need to send */
gint bufoffset; /* offset in the first buffer */
gboolean discont;
gboolean caps_sent;
gboolean new_connection;
gboolean currently_removing;
/* method to sync client when connecting */
GstSyncMethod sync_method;
@ -98,18 +81,6 @@ typedef struct {
guint64 burst_min_value;
GstTCPUnitType burst_max_unit;
guint64 burst_max_value;
GstCaps *caps; /* caps of last queued buffer */
/* stats */
guint64 bytes_sent;
guint64 connect_time;
guint64 disconnect_time;
guint64 last_activity_time;
guint64 dropped_buffers;
guint64 avg_queue_size;
guint64 first_buffer_ts;
guint64 last_buffer_ts;
} GstTCPClient;
/**
@ -118,12 +89,9 @@ typedef struct {
* The multifdsink object structure.
*/
struct _GstMultiFdSink {
GstBaseSink element;
GstMultiHandleSink element;
/*< private >*/
guint64 bytes_to_serve; /* how much bytes we must serve */
guint64 bytes_served; /* how much bytes have we served */
GRecMutex clientslock; /* lock to protect the clients list */
GList *clients; /* list of clients we are serving */
GHashTable *fd_hash; /* index on fd to client */
@ -149,20 +117,10 @@ struct _GstMultiFdSink {
GstTCPUnitType unit_type;/* the type of the units */
gint64 units_max; /* max units to queue for a client */
gint64 units_soft_max; /* max units a client can lag before recovery starts */
GstRecoverPolicy recover_policy;
GstClockTime timeout; /* max amount of nanoseconds to remain idle */
GstSyncMethod def_sync_method; /* what method to use for connecting clients */
GstTCPUnitType def_burst_unit;
guint64 def_burst_value;
/* these values are used to control the amount of data
* kept in the queues. It allows clients to perform a burst
* on connect. */
gint bytes_min; /* min number of bytes to queue */
gint64 time_min; /* min time to queue */
gint buffers_min; /* min number of buffers to queue */
gboolean resend_streamheader; /* resend streamheader if it changes */
/* stats */
@ -174,7 +132,7 @@ struct _GstMultiFdSink {
};
struct _GstMultiFdSinkClass {
GstBaseSinkClass parent_class;
GstMultiHandleSinkClass parent_class;
/* element methods */
void (*add) (GstMultiFdSink *sink, int fd);
@ -206,7 +164,7 @@ void gst_multi_fd_sink_add_full (GstMultiFdSink *sink, int fd, GstS
GstTCPUnitType max_unit, guint64 max_value);
void gst_multi_fd_sink_remove (GstMultiFdSink *sink, int fd);
void gst_multi_fd_sink_remove_flush (GstMultiFdSink *sink, int fd);
void gst_multi_fd_sink_clear (GstMultiFdSink *sink);
void gst_multi_fd_sink_clear (GstMultiHandleSink *sink);
GValueArray* gst_multi_fd_sink_get_stats (GstMultiFdSink *sink, int fd);
G_END_DECLS

View file

@ -22,26 +22,26 @@
*/
/**
* SECTION:element-multisocketsink
* SECTION:element-multihandlesink
* @see_also: tcpserversink
*
* This plugin writes incoming data to a set of file descriptors. The
* file descriptors can be added to multisocketsink by emitting the #GstMultiHandleSink::add signal.
* file descriptors can be added to multihandlesink by emitting the #GstMultiHandleSink::add signal.
* For each descriptor added, the #GstMultiHandleSink::client-added signal will be called.
*
* As of version 0.10.8, a client can also be added with the #GstMultiHandleSink::add-full signal
* that allows for more control over what and how much data a client
* initially receives.
*
* Clients can be removed from multisocketsink by emitting the #GstMultiHandleSink::remove signal. For
* Clients can be removed from multihandlesink by emitting the #GstMultiHandleSink::remove signal. For
* each descriptor removed, the #GstMultiHandleSink::client-removed signal will be called. The
* #GstMultiHandleSink::client-removed signal can also be fired when multisocketsink decides that a
* #GstMultiHandleSink::client-removed signal can also be fired when multihandlesink decides that a
* client is not active anymore or, depending on the value of the
* #GstMultiHandleSink:recover-policy property, if the client is reading too slowly.
* In all cases, multisocketsink will never close a file descriptor itself.
* The user of multisocketsink is responsible for closing all file descriptors.
* In all cases, multihandlesink will never close a file descriptor itself.
* The user of multihandlesink is responsible for closing all file descriptors.
* This can for example be done in response to the #GstMultiHandleSink::client-fd-removed signal.
* Note that multisocketsink still has a reference to the file descriptor when the
* Note that multihandlesink still has a reference to the file descriptor when the
* #GstMultiHandleSink::client-removed signal is emitted, so that "get-stats" can be performed on
* the descriptor; it is therefore not safe to close the file descriptor in
* the #GstMultiHandleSink::client-removed signal handler, and you should use the
@ -52,11 +52,11 @@
* client write can block the pipeline and that clients can read with different
* speeds.
*
* When adding a client to multisocketsink, the #GstMultiHandleSink:sync-method property will define
* When adding a client to multihandlesink, the #GstMultiHandleSink:sync-method property will define
* which buffer in the queued buffers will be sent first to the client. Clients
* can be sent the most recent buffer (which might not be decodable by the
* client if it is not a keyframe), the next keyframe received in
* multisocketsink (which can take some time depending on the keyframe rate), or the
* multihandlesink (which can take some time depending on the keyframe rate), or the
* last received keyframe (which will cause a simple burst-on-connect).
* Multisocketsink will always keep at least one keyframe in its internal buffers
* when the sync-mode is set to latest-keyframe.
@ -76,13 +76,13 @@
* actually be honored.
*
* When streaming data, clients are allowed to read at a different rate than
* the rate at which multisocketsink receives data. If the client is reading too
* fast, no data will be send to the client until multisocketsink receives more
* the rate at which multihandlesink receives data. If the client is reading too
* fast, no data will be send to the client until multihandlesink receives more
* data. If the client, however, reads too slowly, data for that client will be
* queued up in multisocketsink. Two properties control the amount of data
* (buffers) that is queued in multisocketsink: #GstMultiHandleSink:buffers-max and
* queued up in multihandlesink. Two properties control the amount of data
* (buffers) that is queued in multihandlesink: #GstMultiHandleSink:buffers-max and
* #GstMultiHandleSink:buffers-soft-max. A client that falls behind by
* #GstMultiHandleSink:buffers-max is removed from multisocketsink forcibly.
* #GstMultiHandleSink:buffers-max is removed from multihandlesink forcibly.
*
* A client with a lag of at least #GstMultiHandleSink:buffers-soft-max enters the recovery
* procedure which is controlled with the #GstMultiHandleSink:recover-policy property.
@ -92,7 +92,7 @@
* RESYNC_KEYFRAME positions the client at the most recent keyframe in the
* buffer queue.
*
* multisocketsink will by default synchronize on the clock before serving the
* multihandlesink will by default synchronize on the clock before serving the
* buffers to the clients. This behaviour can be disabled by setting the sync
* property to FALSE. Multisocketsink will by default not do QoS and will never
* drop late buffers.
@ -106,7 +106,7 @@
#include <gst/gst-i18n-plugin.h>
#include "gstmultisocketsink.h"
#include "gstmultihandlesink.h"
#include "gsttcp-marshal.h"
#ifndef G_OS_WIN32
@ -120,8 +120,8 @@ static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
GST_PAD_ALWAYS,
GST_STATIC_CAPS_ANY);
GST_DEBUG_CATEGORY_STATIC (multisocketsink_debug);
#define GST_CAT_DEFAULT (multisocketsink_debug)
GST_DEBUG_CATEGORY_STATIC (multihandlesink_debug);
#define GST_CAT_DEFAULT (multihandlesink_debug)
/* MultiHandleSink signals and args */
enum
@ -168,6 +168,7 @@ enum
enum
{
PROP_0,
#if 0
PROP_MODE,
PROP_BUFFERS_QUEUED,
PROP_BYTES_QUEUED,
@ -179,6 +180,7 @@ enum
PROP_BUFFERS_MAX,
PROP_BUFFERS_SOFT_MAX,
#endif
PROP_TIME_MIN,
PROP_BYTES_MIN,
@ -190,6 +192,7 @@ enum
PROP_BYTES_TO_SERVE,
PROP_BYTES_SERVED,
#if 0
PROP_BURST_FORMAT,
PROP_BURST_VALUE,
@ -200,12 +203,15 @@ enum
PROP_RESEND_STREAMHEADER,
PROP_NUM_SOCKETS,
#endif
PROP_LAST
};
#define GST_TYPE_RECOVER_POLICY (gst_multi_handle_sink_recover_policy_get_type())
static GType
// FIXME: make static again when refactored
//#define GST_TYPE_RECOVER_POLICY (gst_multi_handle_sink_recover_policy_get_type())
//static GType
GType
gst_multi_handle_sink_recover_policy_get_type (void)
{
static GType recover_policy_type = 0;
@ -229,8 +235,10 @@ gst_multi_handle_sink_recover_policy_get_type (void)
return recover_policy_type;
}
#define GST_TYPE_SYNC_METHOD (gst_multi_handle_sink_sync_method_get_type())
static GType
// FIXME: make static again after refactoring
//#define GST_TYPE_SYNC_METHOD (gst_multi_handle_sink_sync_method_get_type())
//static GType
GType
gst_multi_handle_sink_sync_method_get_type (void)
{
static GType sync_method_type = 0;
@ -259,8 +267,10 @@ gst_multi_handle_sink_sync_method_get_type (void)
return sync_method_type;
}
#define GST_TYPE_CLIENT_STATUS (gst_multi_handle_sink_client_status_get_type())
static GType
// FIXME: make static again after refactoring
//#define GST_TYPE_CLIENT_STATUS (gst_multi_handle_sink_client_status_get_type())
//static GType
GType
gst_multi_handle_sink_client_status_get_type (void)
{
static GType client_status_type = 0;
@ -283,19 +293,25 @@ gst_multi_handle_sink_client_status_get_type (void)
return client_status_type;
}
#if 0
static void gst_multi_handle_sink_finalize (GObject * object);
#endif
#if 0
static void gst_multi_handle_sink_remove_client_link (GstMultiHandleSink * sink,
GList * link);
static gboolean gst_multi_handle_sink_socket_condition (GSocket * socket,
GIOCondition condition, GstMultiHandleSink * sink);
#endif
#if 0
static GstFlowReturn gst_multi_handle_sink_render (GstBaseSink * bsink,
GstBuffer * buf);
static gboolean gst_multi_handle_sink_unlock (GstBaseSink * bsink);
static gboolean gst_multi_handle_sink_unlock_stop (GstBaseSink * bsink);
static GstStateChangeReturn gst_multi_handle_sink_change_state (GstElement *
element, GstStateChange transition);
#endif
static void gst_multi_handle_sink_set_property (GObject * object, guint prop_id,
const GValue * value, GParamSpec * pspec);
@ -312,26 +328,36 @@ gst_multi_handle_sink_class_init (GstMultiHandleSinkClass * klass)
{
GObjectClass *gobject_class;
GstElementClass *gstelement_class;
#if 0
GstBaseSinkClass *gstbasesink_class;
#endif
gobject_class = (GObjectClass *) klass;
gstelement_class = (GstElementClass *) klass;
#if 0
gstbasesink_class = (GstBaseSinkClass *) klass;
#endif
gobject_class->set_property = gst_multi_handle_sink_set_property;
gobject_class->get_property = gst_multi_handle_sink_get_property;
#if 0
gobject_class->finalize = gst_multi_handle_sink_finalize;
#endif
#if 0
g_object_class_install_property (gobject_class, PROP_BUFFERS_MAX,
g_param_spec_int ("buffers-max", "Buffers max",
"max number of buffers to queue for a client (-1 = no limit)", -1,
G_MAXINT, DEFAULT_BUFFERS_MAX,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
#endif
#if 0
g_object_class_install_property (gobject_class, PROP_BUFFERS_SOFT_MAX,
g_param_spec_int ("buffers-soft-max", "Buffers soft max",
"Recover client when going over this limit (-1 = no limit)", -1,
G_MAXINT, DEFAULT_BUFFERS_SOFT_MAX,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
#endif
g_object_class_install_property (gobject_class, PROP_BYTES_MIN,
g_param_spec_int ("bytes-min", "Bytes min",
@ -349,6 +375,7 @@ gst_multi_handle_sink_class_init (GstMultiHandleSinkClass * klass)
G_MAXINT, DEFAULT_BUFFERS_MIN,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
#if 0
g_object_class_install_property (gobject_class, PROP_UNIT_TYPE,
g_param_spec_enum ("unit-type", "Units type",
"The unit to measure the max/soft-max/queued properties",
@ -377,6 +404,7 @@ gst_multi_handle_sink_class_init (GstMultiHandleSinkClass * klass)
g_param_spec_uint64 ("time-queued", "Time queued",
"Number of time currently queued", 0, G_MAXUINT64, 0,
G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
#endif
#endif
g_object_class_install_property (gobject_class, PROP_RECOVER_POLICY,
@ -402,6 +430,7 @@ gst_multi_handle_sink_class_init (GstMultiHandleSinkClass * klass)
"Total number of bytes send to all clients", 0, G_MAXUINT64, 0,
G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
#if 0
g_object_class_install_property (gobject_class, PROP_BURST_FORMAT,
g_param_spec_enum ("burst-format", "Burst format",
"The format of the burst units (when sync-method is burst[[-with]-keyframe])",
@ -446,13 +475,15 @@ gst_multi_handle_sink_class_init (GstMultiHandleSinkClass * klass)
g_param_spec_uint ("num-sockets", "Number of sockets",
"The current number of client sockets",
0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
#endif
#if 0
/**
* GstMultiHandleSink::add:
* @gstmultisocketsink: the multisocketsink element to emit this signal on
* @socket: the socket to add to multisocketsink
* @gstmultihandlesink: the multihandlesink element to emit this signal on
* @socket: the socket to add to multihandlesink
*
* Hand the given open socket to multisocketsink to write to.
* Hand the given open socket to multihandlesink to write to.
*/
gst_multi_handle_sink_signals[SIGNAL_ADD] =
g_signal_new ("add", G_TYPE_FROM_CLASS (klass),
@ -461,8 +492,8 @@ gst_multi_handle_sink_class_init (GstMultiHandleSinkClass * klass)
g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1, G_TYPE_SOCKET);
/**
* GstMultiHandleSink::add-full:
* @gstmultisocketsink: the multisocketsink element to emit this signal on
* @socket: the socket to add to multisocketsink
* @gstmultihandlesink: the multihandlesink element to emit this signal on
* @socket: the socket to add to multihandlesink
* @sync: the sync method to use
* @format_min: the format of @value_min
* @value_min: the minimum amount of data to burst expressed in
@ -471,7 +502,7 @@ gst_multi_handle_sink_class_init (GstMultiHandleSinkClass * klass)
* @value_max: the maximum amount of data to burst expressed in
* @format_max units.
*
* Hand the given open socket to multisocketsink to write to and
* Hand the given open socket to multihandlesink to write to and
* specify the burst parameters for the new connection.
*/
gst_multi_handle_sink_signals[SIGNAL_ADD_BURST] =
@ -483,10 +514,10 @@ gst_multi_handle_sink_class_init (GstMultiHandleSinkClass * klass)
GST_TYPE_FORMAT, G_TYPE_UINT64);
/**
* GstMultiHandleSink::remove:
* @gstmultisocketsink: the multisocketsink element to emit this signal on
* @socket: the socket to remove from multisocketsink
* @gstmultihandlesink: the multihandlesink element to emit this signal on
* @socket: the socket to remove from multihandlesink
*
* Remove the given open socket from multisocketsink.
* Remove the given open socket from multihandlesink.
*/
gst_multi_handle_sink_signals[SIGNAL_REMOVE] =
g_signal_new ("remove", G_TYPE_FROM_CLASS (klass),
@ -495,10 +526,10 @@ gst_multi_handle_sink_class_init (GstMultiHandleSinkClass * klass)
g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1, G_TYPE_SOCKET);
/**
* GstMultiHandleSink::remove-flush:
* @gstmultisocketsink: the multisocketsink element to emit this signal on
* @socket: the socket to remove from multisocketsink
* @gstmultihandlesink: the multihandlesink element to emit this signal on
* @socket: the socket to remove from multihandlesink
*
* Remove the given open socket from multisocketsink after flushing all
* Remove the given open socket from multihandlesink after flushing all
* the pending data to the socket.
*/
gst_multi_handle_sink_signals[SIGNAL_REMOVE_FLUSH] =
@ -506,11 +537,12 @@ gst_multi_handle_sink_class_init (GstMultiHandleSinkClass * klass)
G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
G_STRUCT_OFFSET (GstMultiHandleSinkClass, remove_flush), NULL, NULL,
g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1, G_TYPE_SOCKET);
#endif
/**
* GstMultiHandleSink::clear:
* @gstmultisocketsink: the multisocketsink element to emit this signal on
* @gstmultihandlesink: the multihandlesink element to emit this signal on
*
* Remove all sockets from multisocketsink. Since multisocketsink did not
* Remove all sockets from multihandlesink. Since multihandlesink did not
* open sockets itself, it does not explicitly close the sockets. The application
* should do so by connecting to the client-socket-removed callback.
*/
@ -520,10 +552,11 @@ gst_multi_handle_sink_class_init (GstMultiHandleSinkClass * klass)
G_STRUCT_OFFSET (GstMultiHandleSinkClass, clear), NULL, NULL,
g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
#if 0
/**
* GstMultiHandleSink::get-stats:
* @gstmultisocketsink: the multisocketsink element to emit this signal on
* @socket: the socket to get stats of from multisocketsink
* @gstmultihandlesink: the multihandlesink element to emit this signal on
* @socket: the socket to get stats of from multihandlesink
*
* Get statistics about @socket. This function returns a GstStructure.
*
@ -542,10 +575,10 @@ gst_multi_handle_sink_class_init (GstMultiHandleSinkClass * klass)
/**
* GstMultiHandleSink::client-added:
* @gstmultisocketsink: the multisocketsink element that emitted this signal
* @socket: the socket that was added to multisocketsink
* @gstmultihandlesink: the multihandlesink element that emitted this signal
* @socket: the socket that was added to multihandlesink
*
* The given socket was added to multisocketsink. This signal will
* The given socket was added to multihandlesink. This signal will
* be emitted from the streaming thread so application should be prepared
* for that.
*/
@ -556,15 +589,15 @@ gst_multi_handle_sink_class_init (GstMultiHandleSinkClass * klass)
G_TYPE_NONE, 1, G_TYPE_OBJECT);
/**
* GstMultiHandleSink::client-removed:
* @gstmultisocketsink: the multisocketsink element that emitted this signal
* @socket: the socket that is to be removed from multisocketsink
* @gstmultihandlesink: the multihandlesink element that emitted this signal
* @socket: the socket that is to be removed from multihandlesink
* @status: the reason why the client was removed
*
* The given socket is about to be removed from multisocketsink. This
* The given socket is about to be removed from multihandlesink. This
* signal will be emitted from the streaming thread so applications should
* be prepared for that.
*
* @gstmultisocketsink still holds a handle to @socket so it is possible to call
* @gstmultihandlesink still holds a handle to @socket so it is possible to call
* the get-stats signal from this callback. For the same reason it is
* not safe to close() and reuse @socket in this callback.
*/
@ -575,14 +608,14 @@ gst_multi_handle_sink_class_init (GstMultiHandleSinkClass * klass)
G_TYPE_NONE, 2, G_TYPE_INT, GST_TYPE_CLIENT_STATUS);
/**
* GstMultiHandleSink::client-socket-removed:
* @gstmultisocketsink: the multisocketsink element that emitted this signal
* @socket: the socket that was removed from multisocketsink
* @gstmultihandlesink: the multihandlesink element that emitted this signal
* @socket: the socket that was removed from multihandlesink
*
* The given socket was removed from multisocketsink. This signal will
* The given socket was removed from multihandlesink. This signal will
* be emitted from the streaming thread so applications should be prepared
* for that.
*
* In this callback, @gstmultisocketsink has removed all the information
* In this callback, @gstmultihandlesink has removed all the information
* associated with @socket and it is therefore not possible to call get-stats
* with @socket. It is however safe to close() and reuse @fd in the callback.
*
@ -593,6 +626,7 @@ gst_multi_handle_sink_class_init (GstMultiHandleSinkClass * klass)
G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstMultiHandleSinkClass,
client_socket_removed), NULL, NULL, g_cclosure_marshal_VOID__OBJECT,
G_TYPE_NONE, 1, G_TYPE_SOCKET);
#endif
gst_element_class_add_pad_template (gstelement_class,
gst_static_pad_template_get (&sinktemplate));
@ -604,6 +638,7 @@ gst_multi_handle_sink_class_init (GstMultiHandleSinkClass * klass)
"Wim Taymans <wim@fluendo.com>, "
"Sebastian Dröge <sebastian.droege@collabora.co.uk>");
#if 0
gstelement_class->change_state =
GST_DEBUG_FUNCPTR (gst_multi_handle_sink_change_state);
@ -611,15 +646,18 @@ gst_multi_handle_sink_class_init (GstMultiHandleSinkClass * klass)
gstbasesink_class->unlock = GST_DEBUG_FUNCPTR (gst_multi_handle_sink_unlock);
gstbasesink_class->unlock_stop =
GST_DEBUG_FUNCPTR (gst_multi_handle_sink_unlock_stop);
#endif
#if 0
klass->add = GST_DEBUG_FUNCPTR (gst_multi_handle_sink_add);
klass->add_full = GST_DEBUG_FUNCPTR (gst_multi_handle_sink_add_full);
klass->remove = GST_DEBUG_FUNCPTR (gst_multi_handle_sink_remove);
klass->remove_flush = GST_DEBUG_FUNCPTR (gst_multi_handle_sink_remove_flush);
klass->clear = GST_DEBUG_FUNCPTR (gst_multi_handle_sink_clear);
klass->get_stats = GST_DEBUG_FUNCPTR (gst_multi_handle_sink_get_stats);
#endif
GST_DEBUG_CATEGORY_INIT (multisocketsink_debug, "multisocketsink", 0,
GST_DEBUG_CATEGORY_INIT (multihandlesink_debug, "multihandlesink", 0,
"Multi socket sink");
}
@ -655,6 +693,7 @@ gst_multi_handle_sink_init (GstMultiHandleSink * this)
this->cancellable = g_cancellable_new ();
}
#if 0
static void
gst_multi_handle_sink_finalize (GObject * object)
{
@ -754,7 +793,37 @@ setup_dscp (GstMultiHandleSink * sink)
}
CLIENTS_UNLOCK (sink);
}
#endif
void
gst_multi_handle_sink_client_init (GstMultiHandleClient * client,
GstSyncMethod sync_method)
{
GTimeVal now;
client->status = GST_CLIENT_STATUS_OK;
client->bufpos = -1;
client->flushcount = -1;
client->bufoffset = 0;
client->sending = NULL;
client->bytes_sent = 0;
client->dropped_buffers = 0;
client->avg_queue_size = 0;
client->first_buffer_ts = GST_CLOCK_TIME_NONE;
client->last_buffer_ts = GST_CLOCK_TIME_NONE;
client->new_connection = TRUE;
client->sync_method = sync_method;
client->currently_removing = FALSE;
/* update start time */
g_get_current_time (&now);
client->connect_time = GST_TIMEVAL_TO_TIME (now);
client->disconnect_time = 0;
/* set last activity time to connect time */
client->last_activity_time = client->connect_time;
}
#if 0
/* "add-full" signal implementation */
void
gst_multi_handle_sink_add_full (GstMultiHandleSink * sink, GSocket * socket,
@ -779,30 +848,10 @@ gst_multi_handle_sink_add_full (GstMultiHandleSink * sink, GSocket * socket,
/* create client datastructure */
client = g_new0 (GstSocketClient, 1);
client->socket = G_SOCKET (g_object_ref (socket));
client->status = GST_CLIENT_STATUS_OK;
client->bufpos = -1;
client->flushcount = -1;
client->bufoffset = 0;
client->sending = NULL;
client->bytes_sent = 0;
client->dropped_buffers = 0;
client->avg_queue_size = 0;
client->first_buffer_ts = GST_CLOCK_TIME_NONE;
client->last_buffer_ts = GST_CLOCK_TIME_NONE;
client->new_connection = TRUE;
client->burst_min_format = min_format;
client->burst_min_value = min_value;
client->burst_max_format = max_format;
client->burst_max_value = max_value;
client->sync_method = sync_method;
client->currently_removing = FALSE;
/* update start time */
g_get_current_time (&now);
client->connect_time = GST_TIMEVAL_TO_TIME (now);
client->disconnect_time = 0;
/* set last activity time to connect time */
client->last_activity_time = client->connect_time;
CLIENTS_LOCK (sink);
@ -984,7 +1033,7 @@ gst_multi_handle_sink_get_stats (GstMultiHandleSink * sink, GSocket * socket)
if (client != NULL) {
guint64 interval;
result = gst_structure_new_empty ("multisocketsink-stats");
result = gst_structure_new_empty ("multihandlesink-stats");
if (client->disconnect_time == 0) {
GTimeVal nowtv;
@ -1013,13 +1062,15 @@ noclient:
/* python doesn't like a NULL pointer yet */
if (result == NULL) {
GST_WARNING_OBJECT (sink, "[socket %p] no client with this found!", socket);
result = gst_structure_new_empty ("multisocketsink-stats");
result = gst_structure_new_empty ("multihandlesink-stats");
}
return result;
}
#endif
/* should be called with the clientslock helt.
#if 0
/* should be called with the clientslock held.
* Note that we don't close the fd as we didn't open it in the first
* place. An application should connect to the client-fd-removed signal and
* close the fd itself.
@ -1133,7 +1184,9 @@ gst_multi_handle_sink_remove_client_link (GstMultiHandleSink * sink,
CLIENTS_LOCK (sink);
}
#endif
#if 0
/* handle a read on a client socket,
* which either indicates a close or should be ignored
* returns FALSE if some error occured or the client closed. */
@ -1187,19 +1240,9 @@ gst_multi_handle_sink_handle_client_read (GstMultiHandleSink * sink,
return ret;
}
#endif
static gboolean
is_sync_frame (GstMultiHandleSink * sink, GstBuffer * buffer)
{
if (GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DELTA_UNIT)) {
return FALSE;
} else if (!GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_IN_CAPS)) {
return TRUE;
}
return FALSE;
}
#if 0
/* queue the given buffer for the given client */
static gboolean
gst_multi_handle_sink_client_queue_buffer (GstMultiHandleSink * sink,
@ -1318,6 +1361,20 @@ gst_multi_handle_sink_client_queue_buffer (GstMultiHandleSink * sink,
return TRUE;
}
#endif
#if 0
static gboolean
is_sync_frame (GstMultiHandleSink * sink, GstBuffer * buffer)
{
if (GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DELTA_UNIT)) {
return FALSE;
} else if (!GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_IN_CAPS)) {
return TRUE;
}
return FALSE;
}
/* find the keyframe in the list of buffers starting the
* search from @idx. @direction as -1 will search backwards,
@ -1349,10 +1406,12 @@ find_syncframe (GstMultiHandleSink * sink, gint idx, gint direction)
}
return result;
}
#endif
#define find_next_syncframe(s,i) find_syncframe(s,i,1)
#define find_prev_syncframe(s,i) find_syncframe(s,i,-1)
#if 0
/* Get the number of buffers from the buffer queue needed to satisfy
* the maximum max in the configured units.
* If units are not BUFFERS, and there are insufficient buffers in the
@ -1409,7 +1468,9 @@ get_buffers_max (GstMultiHandleSink * sink, gint64 max)
return max;
}
}
#endif
#if 0
/* find the positions in the buffer queue where *_min and *_max
* is satisfied
*/
@ -1553,7 +1614,9 @@ assign_value (GstFormat format, guint64 value, gint * bytes, gint * buffers,
}
return res;
}
#endif
#if 0
/* count the index in the buffer queue to satisfy the given unit
* and value pair starting from buffer at index 0.
*
@ -1577,7 +1640,9 @@ count_burst_unit (GstMultiHandleSink * sink, gint * min_idx,
return find_limits (sink, min_idx, bytes_min, buffers_min, time_min,
max_idx, bytes_max, buffers_max, time_max);
}
#endif
#if 0
/* decide where in the current buffer queue this new client should start
* receiving buffers from.
* This function is called whenever a client is connected and has not yet
@ -1778,7 +1843,9 @@ gst_multi_handle_sink_new_client (GstMultiHandleSink * sink,
}
return result;
}
#endif
#if 0
/* Handle a write on a client,
* which indicates a read request from a client.
*
@ -1968,7 +2035,9 @@ write_error:
return FALSE;
}
}
#endif
#if 0
/* calculate the new position for a client after recovery. This function
* does not update the client position but merely returns the required
* position.
@ -2021,7 +2090,9 @@ gst_multi_handle_sink_recover_client (GstMultiHandleSink * sink,
}
return newbufpos;
}
#endif
#if 0
/* Queue a buffer on the global queue.
*
* This function adds the buffer to the front of a GArray. It removes the
@ -2210,7 +2281,9 @@ restart:
sink->buffers_queued = max_buffer_usage;
CLIENTS_UNLOCK (sink);
}
#endif
#if 0
/* Handle the clients. This is called when a socket becomes ready
* to read or writable. Badly behaving clients are put on a
* garbage list and removed.
@ -2271,7 +2344,9 @@ done:
return ret;
}
#endif
#if 0
static gboolean
gst_multi_handle_sink_timeout (GstMultiHandleSink * sink)
{
@ -2296,7 +2371,9 @@ gst_multi_handle_sink_timeout (GstMultiHandleSink * sink)
return FALSE;
}
#endif
#if 0
/* we handle the client communication in another thread so that we do not block
* the gstreamer thread while we select() on the client fds */
static gpointer
@ -2328,7 +2405,9 @@ gst_multi_handle_sink_thread (GstMultiHandleSink * sink)
return NULL;
}
#endif
#if 0
static GstFlowReturn
gst_multi_handle_sink_render (GstBaseSink * bsink, GstBuffer * buf)
{
@ -2435,66 +2514,72 @@ no_caps:
}
#endif
}
#endif
static void
gst_multi_handle_sink_set_property (GObject * object, guint prop_id,
const GValue * value, GParamSpec * pspec)
{
GstMultiHandleSink *multisocketsink;
GstMultiHandleSink *multihandlesink;
multisocketsink = GST_MULTI_HANDLE_SINK (object);
multihandlesink = GST_MULTI_HANDLE_SINK (object);
switch (prop_id) {
#if 0
case PROP_BUFFERS_MAX:
multisocketsink->units_max = g_value_get_int (value);
multihandlesink->units_max = g_value_get_int (value);
break;
case PROP_BUFFERS_SOFT_MAX:
multisocketsink->units_soft_max = g_value_get_int (value);
multihandlesink->units_soft_max = g_value_get_int (value);
break;
#endif
case PROP_TIME_MIN:
multisocketsink->time_min = g_value_get_int64 (value);
multihandlesink->time_min = g_value_get_int64 (value);
break;
case PROP_BYTES_MIN:
multisocketsink->bytes_min = g_value_get_int (value);
multihandlesink->bytes_min = g_value_get_int (value);
break;
case PROP_BUFFERS_MIN:
multisocketsink->buffers_min = g_value_get_int (value);
multihandlesink->buffers_min = g_value_get_int (value);
break;
#if 0
case PROP_UNIT_TYPE:
multisocketsink->unit_type = g_value_get_enum (value);
multihandlesink->unit_type = g_value_get_enum (value);
break;
case PROP_UNITS_MAX:
multisocketsink->units_max = g_value_get_int64 (value);
multihandlesink->units_max = g_value_get_int64 (value);
break;
case PROP_UNITS_SOFT_MAX:
multisocketsink->units_soft_max = g_value_get_int64 (value);
multihandlesink->units_soft_max = g_value_get_int64 (value);
break;
#endif
case PROP_RECOVER_POLICY:
multisocketsink->recover_policy = g_value_get_enum (value);
multihandlesink->recover_policy = g_value_get_enum (value);
break;
case PROP_TIMEOUT:
multisocketsink->timeout = g_value_get_uint64 (value);
multihandlesink->timeout = g_value_get_uint64 (value);
break;
case PROP_SYNC_METHOD:
multisocketsink->def_sync_method = g_value_get_enum (value);
multihandlesink->def_sync_method = g_value_get_enum (value);
break;
#if 0
case PROP_BURST_FORMAT:
multisocketsink->def_burst_format = g_value_get_enum (value);
multihandlesink->def_burst_format = g_value_get_enum (value);
break;
case PROP_BURST_VALUE:
multisocketsink->def_burst_value = g_value_get_uint64 (value);
multihandlesink->def_burst_value = g_value_get_uint64 (value);
break;
case PROP_QOS_DSCP:
multisocketsink->qos_dscp = g_value_get_int (value);
setup_dscp (multisocketsink);
multihandlesink->qos_dscp = g_value_get_int (value);
setup_dscp (multihandlesink);
break;
case PROP_HANDLE_READ:
multisocketsink->handle_read = g_value_get_boolean (value);
multihandlesink->handle_read = g_value_get_boolean (value);
break;
case PROP_RESEND_STREAMHEADER:
multisocketsink->resend_streamheader = g_value_get_boolean (value);
multihandlesink->resend_streamheader = g_value_get_boolean (value);
break;
#endif
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
@ -2505,79 +2590,84 @@ static void
gst_multi_handle_sink_get_property (GObject * object, guint prop_id,
GValue * value, GParamSpec * pspec)
{
GstMultiHandleSink *multisocketsink;
GstMultiHandleSink *multihandlesink;
multisocketsink = GST_MULTI_HANDLE_SINK (object);
multihandlesink = GST_MULTI_HANDLE_SINK (object);
switch (prop_id) {
#if 0
case PROP_BUFFERS_MAX:
g_value_set_int (value, multisocketsink->units_max);
g_value_set_int (value, multihandlesink->units_max);
break;
case PROP_BUFFERS_SOFT_MAX:
g_value_set_int (value, multisocketsink->units_soft_max);
g_value_set_int (value, multihandlesink->units_soft_max);
break;
#endif
case PROP_TIME_MIN:
g_value_set_int64 (value, multisocketsink->time_min);
g_value_set_int64 (value, multihandlesink->time_min);
break;
case PROP_BYTES_MIN:
g_value_set_int (value, multisocketsink->bytes_min);
g_value_set_int (value, multihandlesink->bytes_min);
break;
case PROP_BUFFERS_MIN:
g_value_set_int (value, multisocketsink->buffers_min);
g_value_set_int (value, multihandlesink->buffers_min);
break;
#if 0
case PROP_BUFFERS_QUEUED:
g_value_set_uint (value, multisocketsink->buffers_queued);
g_value_set_uint (value, multihandlesink->buffers_queued);
break;
case PROP_BYTES_QUEUED:
g_value_set_uint (value, multisocketsink->bytes_queued);
g_value_set_uint (value, multihandlesink->bytes_queued);
break;
case PROP_TIME_QUEUED:
g_value_set_uint64 (value, multisocketsink->time_queued);
g_value_set_uint64 (value, multihandlesink->time_queued);
break;
case PROP_UNIT_TYPE:
g_value_set_enum (value, multisocketsink->unit_type);
g_value_set_enum (value, multihandlesink->unit_type);
break;
case PROP_UNITS_MAX:
g_value_set_int64 (value, multisocketsink->units_max);
g_value_set_int64 (value, multihandlesink->units_max);
break;
case PROP_UNITS_SOFT_MAX:
g_value_set_int64 (value, multisocketsink->units_soft_max);
g_value_set_int64 (value, multihandlesink->units_soft_max);
break;
#endif
case PROP_RECOVER_POLICY:
g_value_set_enum (value, multisocketsink->recover_policy);
g_value_set_enum (value, multihandlesink->recover_policy);
break;
case PROP_TIMEOUT:
g_value_set_uint64 (value, multisocketsink->timeout);
g_value_set_uint64 (value, multihandlesink->timeout);
break;
case PROP_SYNC_METHOD:
g_value_set_enum (value, multisocketsink->def_sync_method);
g_value_set_enum (value, multihandlesink->def_sync_method);
break;
case PROP_BYTES_TO_SERVE:
g_value_set_uint64 (value, multisocketsink->bytes_to_serve);
g_value_set_uint64 (value, multihandlesink->bytes_to_serve);
break;
case PROP_BYTES_SERVED:
g_value_set_uint64 (value, multisocketsink->bytes_served);
g_value_set_uint64 (value, multihandlesink->bytes_served);
break;
#if 0
case PROP_BURST_FORMAT:
g_value_set_enum (value, multisocketsink->def_burst_format);
g_value_set_enum (value, multihandlesink->def_burst_format);
break;
case PROP_BURST_VALUE:
g_value_set_uint64 (value, multisocketsink->def_burst_value);
g_value_set_uint64 (value, multihandlesink->def_burst_value);
break;
case PROP_QOS_DSCP:
g_value_set_int (value, multisocketsink->qos_dscp);
g_value_set_int (value, multihandlesink->qos_dscp);
break;
case PROP_HANDLE_READ:
g_value_set_boolean (value, multisocketsink->handle_read);
g_value_set_boolean (value, multihandlesink->handle_read);
break;
case PROP_RESEND_STREAMHEADER:
g_value_set_boolean (value, multisocketsink->resend_streamheader);
g_value_set_boolean (value, multihandlesink->resend_streamheader);
break;
case PROP_NUM_SOCKETS:
g_value_set_uint (value,
g_hash_table_size (multisocketsink->socket_hash));
g_hash_table_size (multihandlesink->socket_hash));
break;
#endif
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
@ -2585,6 +2675,7 @@ gst_multi_handle_sink_get_property (GObject * object, guint prop_id,
}
#if 0
/* create a socket for sending to remote machine */
static gboolean
gst_multi_handle_sink_start (GstBaseSink * bsink)
@ -2643,7 +2734,9 @@ multisocketsink_hash_remove (gpointer key, gpointer value, gpointer data)
{
return TRUE;
}
#endif
#if 0
static gboolean
gst_multi_handle_sink_stop (GstBaseSink * bsink)
{
@ -2671,7 +2764,7 @@ gst_multi_handle_sink_stop (GstBaseSink * bsink)
}
/* free the clients */
gst_multi_handle_sink_clear (this);
fclass->clear (this);
if (this->streamheader) {
g_slist_foreach (this->streamheader, (GFunc) gst_mini_object_unref, NULL);
@ -2707,7 +2800,9 @@ gst_multi_handle_sink_stop (GstBaseSink * bsink)
return TRUE;
}
#endif
#if 0
static GstStateChangeReturn
gst_multi_handle_sink_change_state (GstElement * element,
GstStateChange transition)
@ -2786,3 +2881,4 @@ gst_multi_handle_sink_unlock_stop (GstBaseSink * bsink)
return TRUE;
}
#endif

View file

@ -47,13 +47,11 @@ G_BEGIN_DECLS
typedef struct _GstMultiHandleSink GstMultiHandleSink;
typedef struct _GstMultiHandleSinkClass GstMultiHandleSinkClass;
#if 0
typedef enum {
GST_MULTI_HANDLE_SINK_OPEN = (GST_ELEMENT_FLAG_LAST << 0),
GST_MULTI_HANDLE_SINK_FLAG_LAST = (GST_ELEMENT_FLAG_LAST << 2)
} GstMultiHandleSinkFlags;
#endif
/**
* GstRecoverPolicy:
@ -122,13 +120,9 @@ typedef enum
GST_CLIENT_STATUS_FLUSHING = 6
} GstClientStatus;
#if 0
/* structure for a client
*/
typedef struct {
GSocket *socket;
GSource *source;
gint bufpos; /* position of this client in the global queue */
gint flushcount; /* the remaining number of buffers to flush out or -1 if the
client is not flushing. */
@ -141,15 +135,18 @@ typedef struct {
gboolean discont;
gboolean new_connection;
gboolean currently_removing;
/* method to sync client when connecting */
GstSyncMethod sync_method;
// FIXME: refactor format vs unit
#if 0
GstFormat burst_min_format;
guint64 burst_min_value;
GstFormat burst_max_format;
guint64 burst_max_value;
#endif
GstCaps *caps; /* caps of last queued buffer */
@ -162,8 +159,7 @@ typedef struct {
guint64 avg_queue_size;
guint64 first_buffer_ts;
guint64 last_buffer_ts;
} GstSocketClient;
#endif
} GstMultiHandleClient;
#define CLIENTS_LOCK_INIT(socketsink) (g_rec_mutex_init(&socketsink->clientslock))
#define CLIENTS_LOCK_CLEAR(socketsink) (g_rec_mutex_clear(&socketsink->clientslock))
@ -257,17 +253,28 @@ struct _GstMultiHandleSinkClass {
GType gst_multi_handle_sink_get_type (void);
#if 0
void gst_multi_handle_sink_add (GstMultiHandleSink *sink, GSocket *socket);
void gst_multi_handle_sink_add_full (GstMultiHandleSink *sink, GSocket *socket, GstSyncMethod sync,
GstFormat min_format, guint64 min_value,
GstFormat max_format, guint64 max_value);
void gst_multi_handle_sink_remove (GstMultiHandleSink *sink, GSocket *socket);
void gst_multi_handle_sink_remove_flush (GstMultiHandleSink *sink, GSocket *socket);
void gst_multi_handle_sink_clear (GstMultiHandleSink *sink);
GstStructure* gst_multi_handle_sink_get_stats (GstMultiHandleSink *sink, GSocket *socket);
void gst_multi_handle_sink_client_init (GstMultiHandleClient * client, GstSyncMethod sync_method);
// FIXME: make static again after refactoring
#define GST_TYPE_RECOVER_POLICY (gst_multi_handle_sink_recover_policy_get_type())
GType
gst_multi_handle_sink_recover_policy_get_type (void);
#define GST_TYPE_SYNC_METHOD (gst_multi_handle_sink_sync_method_get_type())
GType
gst_multi_handle_sink_sync_method_get_type (void);
#define GST_TYPE_CLIENT_STATUS (gst_multi_handle_sink_client_status_get_type())
GType
gst_multi_handle_sink_client_status_get_type (void);
G_END_DECLS
#endif
#endif /* __GST_MULTI_HANDLE_SINK_H__ */

File diff suppressed because it is too large Load diff

View file

@ -50,70 +50,31 @@ G_BEGIN_DECLS
typedef struct _GstMultiSocketSink GstMultiSocketSink;
typedef struct _GstMultiSocketSinkClass GstMultiSocketSinkClass;
typedef enum {
GST_MULTI_SOCKET_SINK_OPEN = (GST_ELEMENT_FLAG_LAST << 0),
GST_MULTI_SOCKET_SINK_FLAG_LAST = (GST_ELEMENT_FLAG_LAST << 2)
} GstMultiSocketSinkFlags;
/* structure for a client
*/
typedef struct {
GstMultiHandleClient client;
GSocket *socket;
GSource *source;
gint bufpos; /* position of this client in the global queue */
gint flushcount; /* the remaining number of buffers to flush out or -1 if the
client is not flushing. */
GstClientStatus status;
GSList *sending; /* the buffers we need to send */
gint bufoffset; /* offset in the first buffer */
gboolean discont;
gboolean new_connection;
gboolean currently_removing;
/* method to sync client when connecting */
GstSyncMethod sync_method;
GstFormat burst_min_format;
guint64 burst_min_value;
GstFormat burst_max_format;
guint64 burst_max_value;
GstCaps *caps; /* caps of last queued buffer */
/* stats */
guint64 bytes_sent;
guint64 connect_time;
guint64 disconnect_time;
guint64 last_activity_time;
guint64 dropped_buffers;
guint64 avg_queue_size;
guint64 first_buffer_ts;
guint64 last_buffer_ts;
} GstSocketClient;
#define CLIENTS_LOCK_INIT(socketsink) (g_rec_mutex_init(&socketsink->clientslock))
#define CLIENTS_LOCK_CLEAR(socketsink) (g_rec_mutex_clear(&socketsink->clientslock))
#define CLIENTS_LOCK(socketsink) (g_rec_mutex_lock(&socketsink->clientslock))
#define CLIENTS_UNLOCK(socketsink) (g_rec_mutex_unlock(&socketsink->clientslock))
/**
* GstMultiSocketSink:
*
* The multisocketsink object structure.
*/
struct _GstMultiSocketSink {
GstBaseSink element;
GstMultiHandleSink element;
/*< private >*/
guint64 bytes_to_serve; /* how much bytes we must serve */
guint64 bytes_served; /* how much bytes have we served */
GRecMutex clientslock; /* lock to protect the clients list */
GList *clients; /* list of clients we are serving */
GHashTable *socket_hash; /* index on socket to client */
@ -139,20 +100,10 @@ struct _GstMultiSocketSink {
GstFormat unit_type;/* the format of the units */
gint64 units_max; /* max units to queue for a client */
gint64 units_soft_max; /* max units a client can lag before recovery starts */
GstRecoverPolicy recover_policy;
GstClockTime timeout; /* max amount of nanoseconds to remain idle */
GstSyncMethod def_sync_method; /* what method to use for connecting clients */
GstFormat def_burst_format;
guint64 def_burst_value;
/* these values are used to control the amount of data
* kept in the queues. It allows clients to perform a burst
* on connect. */
gint bytes_min; /* min number of bytes to queue */
gint64 time_min; /* min time to queue */
gint buffers_min; /* min number of buffers to queue */
gboolean resend_streamheader; /* resend streamheader if it changes */
/* stats */
@ -164,7 +115,7 @@ struct _GstMultiSocketSink {
};
struct _GstMultiSocketSinkClass {
GstBaseSinkClass parent_class;
GstMultiHandleSinkClass parent_class;
/* element methods */
void (*add) (GstMultiSocketSink *sink, GSocket *socket);
@ -195,7 +146,7 @@ void gst_multi_socket_sink_add_full (GstMultiSocketSink *sink, GSoc
GstFormat max_format, guint64 max_value);
void gst_multi_socket_sink_remove (GstMultiSocketSink *sink, GSocket *socket);
void gst_multi_socket_sink_remove_flush (GstMultiSocketSink *sink, GSocket *socket);
void gst_multi_socket_sink_clear (GstMultiSocketSink *sink);
void gst_multi_socket_sink_clear (GstMultiHandleSink *sink);
GstStructure* gst_multi_socket_sink_get_stats (GstMultiSocketSink *sink, GSocket *socket);
G_END_DECLS