gst/tcp/gstmultifdsink.*: Added more stats, added timeout for a client, fixed some typos and added some comments.

Original commit message from CVS:
* gst/tcp/gstmultifdsink.c: (gst_multifdsink_class_init),
(gst_multifdsink_init), (gst_multifdsink_add),
(gst_multifdsink_client_remove),
(gst_multifdsink_handle_client_write),
(gst_multifdsink_queue_buffer), (gst_multifdsink_chain),
(gst_multifdsink_set_property), (gst_multifdsink_get_property),
(gst_multifdsink_init_send):
* gst/tcp/gstmultifdsink.h:
Added more stats, added timeout for a client, fixed some typos
and added some comments.
This commit is contained in:
Wim Taymans 2004-07-20 09:55:04 +00:00
parent dec0f7adb9
commit f087fddb7e
3 changed files with 101 additions and 30 deletions

View file

@ -1,3 +1,16 @@
2004-07-20 Wim Taymans <wim@fluendo.com>
* gst/tcp/gstmultifdsink.c: (gst_multifdsink_class_init),
(gst_multifdsink_init), (gst_multifdsink_add),
(gst_multifdsink_client_remove),
(gst_multifdsink_handle_client_write),
(gst_multifdsink_queue_buffer), (gst_multifdsink_chain),
(gst_multifdsink_set_property), (gst_multifdsink_get_property),
(gst_multifdsink_init_send):
* gst/tcp/gstmultifdsink.h:
Added more stats, added timeout for a client, fixed some typos
and added some comments.
2004-07-20 Wim Taymans <wim@fluendo.com> 2004-07-20 Wim Taymans <wim@fluendo.com>
* gst/tcp/gstmultifdsink.c: (gst_multifdsink_class_init), * gst/tcp/gstmultifdsink.c: (gst_multifdsink_class_init),

View file

@ -32,6 +32,8 @@
#include "gstmultifdsink.h" #include "gstmultifdsink.h"
#include "gsttcp-marshal.h" #include "gsttcp-marshal.h"
/* the select call is also performed on the control sockets, that way
* we can send special commands to unblock or restart the select call */
#define CONTROL_RESTART 'R' /* restart the select call */ #define CONTROL_RESTART 'R' /* restart the select call */
#define CONTROL_STOP 'S' /* stop the select call */ #define CONTROL_STOP 'S' /* stop the select call */
#define CONTROL_SOCKETS(sink) sink->control_sock #define CONTROL_SOCKETS(sink) sink->control_sock
@ -50,10 +52,11 @@ G_STMT_START { \
/* elementfactory information */ /* elementfactory information */
static GstElementDetails gst_multifdsink_details = static GstElementDetails gst_multifdsink_details =
GST_ELEMENT_DETAILS ("TCP Server sink", GST_ELEMENT_DETAILS ("MultiFd sink",
"Sink/Network", "Sink/Network",
"Send data as a server over the network via TCP", "Send data to multiple filedescriptors",
"Thomas Vander Stichele <thomas at apestaart dot org>"); "Thomas Vander Stichele <thomas at apestaart dot org>, "
"Wim Taymans <wim@fluendo.com>");
GST_DEBUG_CATEGORY (multifdsink_debug); GST_DEBUG_CATEGORY (multifdsink_debug);
#define GST_CAT_DEFAULT (multifdsink_debug) #define GST_CAT_DEFAULT (multifdsink_debug)
@ -66,15 +69,20 @@ enum
SIGNAL_REMOVE, SIGNAL_REMOVE,
SIGNAL_CLEAR, SIGNAL_CLEAR,
SIGNAL_GET_STATS, SIGNAL_GET_STATS,
/* signals */ /* signals */
SIGNAL_CLIENT_ADDED, SIGNAL_CLIENT_ADDED,
SIGNAL_CLIENT_REMOVED, SIGNAL_CLIENT_REMOVED,
LAST_SIGNAL LAST_SIGNAL
}; };
/* this is really arbitrary choosen */ /* this is really arbitrary choosen */
#define DEFAULT_PROTOCOL GST_TCP_PROTOCOL_TYPE_NONE
#define DEFAULT_BUFFERS_MAX -1 #define DEFAULT_BUFFERS_MAX -1
#define DEFAULT_BUFFERS_SOFT_MAX -1 #define DEFAULT_BUFFERS_SOFT_MAX -1
#define DEFAULT_RECOVER_POLICY GST_RECOVER_POLICY_NONE
#define DEFAULT_TIMEOUT 0
enum enum
{ {
@ -84,6 +92,9 @@ enum
ARG_BUFFERS_SOFT_MAX, ARG_BUFFERS_SOFT_MAX,
ARG_BUFFERS_QUEUED, ARG_BUFFERS_QUEUED,
ARG_RECOVER_POLICY, ARG_RECOVER_POLICY,
ARG_TIMEOUT,
ARG_BYTES_TO_SERVE,
ARG_BYTES_SERVED,
}; };
#define GST_TYPE_RECOVER_POLICY (gst_recover_policy_get_type()) #define GST_TYPE_RECOVER_POLICY (gst_recover_policy_get_type())
@ -179,8 +190,7 @@ gst_multifdsink_class_init (GstMultiFdSinkClass * klass)
g_object_class_install_property (gobject_class, ARG_PROTOCOL, g_object_class_install_property (gobject_class, ARG_PROTOCOL,
g_param_spec_enum ("protocol", "Protocol", "The protocol to wrap data in", g_param_spec_enum ("protocol", "Protocol", "The protocol to wrap data in",
GST_TYPE_TCP_PROTOCOL_TYPE, GST_TCP_PROTOCOL_TYPE_NONE, GST_TYPE_TCP_PROTOCOL_TYPE, DEFAULT_PROTOCOL, G_PARAM_READWRITE));
G_PARAM_READWRITE));
g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BUFFERS_MAX, g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BUFFERS_MAX,
g_param_spec_int ("buffers-max", "Buffers max", g_param_spec_int ("buffers-max", "Buffers max",
"max number of buffers to queue (-1 = no limit)", -1, G_MAXINT, "max number of buffers to queue (-1 = no limit)", -1, G_MAXINT,
@ -191,12 +201,24 @@ gst_multifdsink_class_init (GstMultiFdSinkClass * klass)
G_MAXINT, DEFAULT_BUFFERS_SOFT_MAX, G_PARAM_READWRITE)); G_MAXINT, DEFAULT_BUFFERS_SOFT_MAX, G_PARAM_READWRITE));
g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BUFFERS_QUEUED, g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BUFFERS_QUEUED,
g_param_spec_int ("buffers-queued", "Buffers queued", g_param_spec_int ("buffers-queued", "Buffers queued",
"Number of buffers current queued", 0, G_MAXINT, 0, "Number of buffers currently queued", 0, G_MAXINT, 0,
G_PARAM_READABLE)); G_PARAM_READABLE));
g_object_class_install_property (gobject_class, ARG_RECOVER_POLICY, g_object_class_install_property (gobject_class, ARG_RECOVER_POLICY,
g_param_spec_enum ("recover-policy", "Recover Policy", g_param_spec_enum ("recover-policy", "Recover Policy",
"How to recover when client reaches the soft max", "How to recover when client reaches the soft max",
GST_TYPE_RECOVER_POLICY, GST_RECOVER_POLICY_NONE, G_PARAM_READWRITE)); GST_TYPE_RECOVER_POLICY, DEFAULT_RECOVER_POLICY, G_PARAM_READWRITE));
g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_TIMEOUT,
g_param_spec_uint64 ("timeout", "Timeout",
"Maximum inactivity timeout in nanoseconds for a client (0 = no limit)",
0, G_MAXUINT64, DEFAULT_TIMEOUT, G_PARAM_READABLE));
g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BYTES_TO_SERVE,
g_param_spec_uint64 ("bytes-to-serve", "Bytes to serve",
"Number of bytes received to serve to clients", 0, G_MAXUINT64, 0,
G_PARAM_READABLE));
g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BYTES_SERVED,
g_param_spec_uint64 ("bytes-served", "Bytes served",
"Total number of bytes send to all clients", 0, G_MAXUINT64, 0,
G_PARAM_READABLE));
gst_multifdsink_signals[SIGNAL_ADD] = gst_multifdsink_signals[SIGNAL_ADD] =
g_signal_new ("add", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, g_signal_new ("add", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
@ -250,16 +272,17 @@ gst_multifdsink_init (GstMultiFdSink * this)
GST_FLAG_UNSET (this, GST_MULTIFDSINK_OPEN); GST_FLAG_UNSET (this, GST_MULTIFDSINK_OPEN);
this->protocol = GST_TCP_PROTOCOL_TYPE_NONE; this->protocol = DEFAULT_PROTOCOL;
this->clientslock = g_mutex_new ();
this->clients = NULL; this->clients = NULL;
this->bufqueue = g_array_new (FALSE, TRUE, sizeof (GstBuffer *)); this->bufqueue = g_array_new (FALSE, TRUE, sizeof (GstBuffer *));
this->buffers_max = DEFAULT_BUFFERS_MAX; this->buffers_max = DEFAULT_BUFFERS_MAX;
this->buffers_soft_max = DEFAULT_BUFFERS_SOFT_MAX; this->buffers_soft_max = DEFAULT_BUFFERS_SOFT_MAX;
this->recover_policy = DEFAULT_RECOVER_POLICY;
this->clientslock = g_mutex_new (); this->timeout = DEFAULT_TIMEOUT;
this->recover_policy = GST_RECOVER_POLICY_NONE;
} }
static void static void
@ -293,6 +316,8 @@ gst_multifdsink_add (GstMultiFdSink * sink, int fd)
/* update start time */ /* update start time */
g_get_current_time (&now); g_get_current_time (&now);
client->connect_time = GST_TIMEVAL_TO_TIME (now); client->connect_time = GST_TIMEVAL_TO_TIME (now);
/* send last activity time to connect time */
client->last_activity_time = GST_TIMEVAL_TO_TIME (now);
g_mutex_lock (sink->clientslock); g_mutex_lock (sink->clientslock);
@ -404,8 +429,6 @@ gst_multifdsink_client_remove (GstMultiFdSink * sink, GstTCPClient * client)
} }
SEND_COMMAND (sink, CONTROL_RESTART); SEND_COMMAND (sink, CONTROL_RESTART);
sink->clients = g_list_remove (sink->clients, client);
g_get_current_time (&now); g_get_current_time (&now);
client->disconnect_time = GST_TIMEVAL_TO_TIME (now); client->disconnect_time = GST_TIMEVAL_TO_TIME (now);
client->connect_interval = client->disconnect_time = client->connect_time; client->connect_interval = client->disconnect_time = client->connect_time;
@ -413,6 +436,8 @@ gst_multifdsink_client_remove (GstMultiFdSink * sink, GstTCPClient * client)
g_signal_emit (G_OBJECT (sink), g_signal_emit (G_OBJECT (sink),
gst_multifdsink_signals[SIGNAL_CLIENT_REMOVED], 0, NULL, fd); gst_multifdsink_signals[SIGNAL_CLIENT_REMOVED], 0, NULL, fd);
sink->clients = g_list_remove (sink->clients, client);
g_free (client); g_free (client);
} }
@ -552,6 +577,11 @@ gst_multifdsink_handle_client_write (GstMultiFdSink * sink,
int fd = client->fd; int fd = client->fd;
gboolean more; gboolean more;
gboolean res; gboolean res;
GstClockTime now;
GTimeVal nowtv;
g_get_current_time (&nowtv);
now = GST_TIMEVAL_TO_TIME (nowtv);
/* when using GDP, first check if we have queued caps yet */ /* when using GDP, first check if we have queued caps yet */
if (sink->protocol == GST_TCP_PROTOCOL_TYPE_GDP) { if (sink->protocol == GST_TCP_PROTOCOL_TYPE_GDP) {
@ -643,12 +673,12 @@ gst_multifdsink_handle_client_write (GstMultiFdSink * sink,
fd); fd);
return FALSE; return FALSE;
} }
} else if (wrote < maxsize) { } else {
if (wrote < maxsize) {
/* partial write means that the client cannot read more and we should /* partial write means that the client cannot read more and we should
* stop sending more */ * stop sending more */
GST_LOG_OBJECT (sink, "partial write on %d of %d bytes", fd, wrote); GST_LOG_OBJECT (sink, "partial write on %d of %d bytes", fd, wrote);
client->bufoffset += wrote; client->bufoffset += wrote;
client->bytes_sent += wrote;
more = FALSE; more = FALSE;
} else { } else {
/* complete buffer was written, we can proceed to the next one */ /* complete buffer was written, we can proceed to the next one */
@ -656,7 +686,11 @@ gst_multifdsink_handle_client_write (GstMultiFdSink * sink,
gst_buffer_unref (head); gst_buffer_unref (head);
/* make sure we start from byte 0 for the next buffer */ /* make sure we start from byte 0 for the next buffer */
client->bufoffset = 0; client->bufoffset = 0;
}
/* update stats */
client->bytes_sent += wrote; client->bytes_sent += wrote;
client->last_activity_time = now;
sink->bytes_served += wrote;
} }
} }
} while (more); } while (more);
@ -732,6 +766,11 @@ gst_multifdsink_queue_buffer (GstMultiFdSink * sink, GstBuffer * buf)
gboolean need_signal = FALSE; gboolean need_signal = FALSE;
gint max_buffer_usage; gint max_buffer_usage;
gint i; gint i;
GTimeVal nowtv;
GstClockTime now;
g_get_current_time (&nowtv);
now = GST_TIMEVAL_TO_TIME (nowtv);
g_mutex_lock (sink->clientslock); g_mutex_lock (sink->clientslock);
/* add buffer to queue */ /* add buffer to queue */
@ -763,8 +802,10 @@ gst_multifdsink_queue_buffer (GstMultiFdSink * sink, GstBuffer * buf)
"client %p with fd %d not recovering position", client, client->fd); "client %p with fd %d not recovering position", client, client->fd);
} }
} }
/* check hard max, remove client */ /* check hard max and timeout, remove client */
if (sink->buffers_max > 0 && client->bufpos >= sink->buffers_max) { if ((sink->buffers_max > 0 && client->bufpos >= sink->buffers_max) ||
(sink->timeout > 0
&& now - client->last_activity_time > sink->timeout)) {
/* remove client */ /* remove client */
GST_WARNING_OBJECT (sink, "client %p with fd %d is too slow, removing", GST_WARNING_OBJECT (sink, "client %p with fd %d is too slow, removing",
client, client->fd); client, client->fd);
@ -943,6 +984,8 @@ gst_multifdsink_handle_clients (GstMultiFdSink * sink)
g_mutex_unlock (sink->clientslock); g_mutex_unlock (sink->clientslock);
} }
/* we handle the client communication in another thread so that we do not block
* the gstreamer thread while we select() on the client fds */
static gpointer static gpointer
gst_multifdsink_thread (GstMultiFdSink * sink) gst_multifdsink_thread (GstMultiFdSink * sink)
{ {
@ -986,7 +1029,7 @@ gst_multifdsink_chain (GstPad * pad, GstData * _data)
/* queue the buffer */ /* queue the buffer */
gst_multifdsink_queue_buffer (sink, buf); gst_multifdsink_queue_buffer (sink, buf);
sink->data_written += GST_BUFFER_SIZE (buf); sink->bytes_to_serve += GST_BUFFER_SIZE (buf);
} }
static void static void
@ -1011,6 +1054,9 @@ gst_multifdsink_set_property (GObject * object, guint prop_id,
case ARG_RECOVER_POLICY: case ARG_RECOVER_POLICY:
multifdsink->recover_policy = g_value_get_enum (value); multifdsink->recover_policy = g_value_get_enum (value);
break; break;
case ARG_TIMEOUT:
multifdsink->timeout = g_value_get_uint64 (value);
break;
default: default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
@ -1043,6 +1089,15 @@ gst_multifdsink_get_property (GObject * object, guint prop_id, GValue * value,
case ARG_RECOVER_POLICY: case ARG_RECOVER_POLICY:
g_value_set_enum (value, multifdsink->recover_policy); g_value_set_enum (value, multifdsink->recover_policy);
break; break;
case ARG_TIMEOUT:
g_value_set_uint64 (value, multifdsink->timeout);
break;
case ARG_BYTES_TO_SERVE:
g_value_set_uint64 (value, multifdsink->bytes_to_serve);
break;
case ARG_BYTES_SERVED:
g_value_set_uint64 (value, multifdsink->bytes_served);
break;
default: default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
@ -1070,7 +1125,8 @@ gst_multifdsink_init_send (GstMultiFdSink * this)
fcntl (WRITE_SOCKET (this), F_SETFL, O_NONBLOCK); fcntl (WRITE_SOCKET (this), F_SETFL, O_NONBLOCK);
this->streamheader = NULL; this->streamheader = NULL;
this->data_written = 0; this->bytes_to_serve = 0;
this->bytes_served = 0;
if (fclass->init) { if (fclass->init) {
fclass->init (this); fclass->init (this);

View file

@ -96,6 +96,7 @@ typedef struct {
guint64 connect_time; guint64 connect_time;
guint64 disconnect_time; guint64 disconnect_time;
guint64 connect_interval; guint64 connect_interval;
guint64 last_activity_time;
guint64 dropped_buffers; guint64 dropped_buffers;
guint64 avg_queue_size; guint64 avg_queue_size;
@ -107,7 +108,8 @@ struct _GstMultiFdSink {
/* pad */ /* pad */
GstPad *sinkpad; GstPad *sinkpad;
size_t data_written; /* how much bytes have we written ? */ guint64 bytes_to_serve; /* how much bytes we must serve */
guint64 bytes_served; /* how much bytes have we served */
GMutex *clientslock; /* lock to protect the clients list */ GMutex *clientslock; /* lock to protect the clients list */
GList *clients; /* list of clients we are serving */ GList *clients; /* list of clients we are serving */
@ -127,7 +129,7 @@ struct _GstMultiFdSink {
GThread *thread; /* the sender thread */ GThread *thread; /* the sender thread */
gint buffers_max; /* max buffers to queue */ gint buffers_max; /* max buffers to queue */
gint buffers_soft_max; /* max buffers a client can lay before recoevery starts */ gint buffers_soft_max; /* max buffers a client can lag before recovery starts */
GstRecoverPolicy recover_policy; GstRecoverPolicy recover_policy;
GstClockTime timeout; /* max amount of nanoseconds to remain idle */ GstClockTime timeout; /* max amount of nanoseconds to remain idle */
/* stats */ /* stats */