gst/tcp/: Starting to prepare for specifying buffer time in other units than buffers. Expose remove reason in signal.

Original commit message from CVS:
* gst/tcp/gstmultifdsink.c: (gst_unit_type_get_type),
(gst_client_status_get_type), (gst_multifdsink_class_init),
(gst_multifdsink_init), (gst_multifdsink_remove_client_link),
(gst_multifdsink_handle_client_read),
(gst_multifdsink_handle_client_write),
(gst_multifdsink_recover_client), (gst_multifdsink_queue_buffer),
(gst_multifdsink_handle_clients), (gst_multifdsink_set_property),
(gst_multifdsink_get_property):
* gst/tcp/gstmultifdsink.h:
* gst/tcp/gsttcp-marshal.list:
Starting to prepare for specifying buffer time in other units
than buffers. Expose remove reason in signal.
This commit is contained in:
Wim Taymans 2004-08-10 15:23:19 +00:00
parent 3b5ba92cea
commit 45208fed48
4 changed files with 158 additions and 34 deletions

View file

@ -1,3 +1,18 @@
2004-08-10 Wim Taymans <wim@fluendo.com>
* gst/tcp/gstmultifdsink.c: (gst_unit_type_get_type),
(gst_client_status_get_type), (gst_multifdsink_class_init),
(gst_multifdsink_init), (gst_multifdsink_remove_client_link),
(gst_multifdsink_handle_client_read),
(gst_multifdsink_handle_client_write),
(gst_multifdsink_recover_client), (gst_multifdsink_queue_buffer),
(gst_multifdsink_handle_clients), (gst_multifdsink_set_property),
(gst_multifdsink_get_property):
* gst/tcp/gstmultifdsink.h:
* gst/tcp/gsttcp-marshal.list:
Starting to prepare for specifying buffer time in other units
than buffers. Expose remove reason in signal.
2004-08-10 Wim Taymans <wim@fluendo.com> 2004-08-10 Wim Taymans <wim@fluendo.com>
* gst/tcp/gstmultifdsink.c: (gst_multifdsink_add), * gst/tcp/gstmultifdsink.c: (gst_multifdsink_add),

View file

@ -81,6 +81,9 @@ enum
#define DEFAULT_PROTOCOL GST_TCP_PROTOCOL_TYPE_NONE #define DEFAULT_PROTOCOL GST_TCP_PROTOCOL_TYPE_NONE
#define DEFAULT_BUFFERS_MAX -1 #define DEFAULT_BUFFERS_MAX -1
#define DEFAULT_BUFFERS_SOFT_MAX -1 #define DEFAULT_BUFFERS_SOFT_MAX -1
#define DEFAULT_UNIT_TYPE GST_UNIT_TYPE_BUFFERS
#define DEFAULT_UNITS_MAX -1
#define DEFAULT_UNITS_SOFT_MAX -1
#define DEFAULT_RECOVER_POLICY GST_RECOVER_POLICY_NONE #define DEFAULT_RECOVER_POLICY GST_RECOVER_POLICY_NONE
#define DEFAULT_TIMEOUT 0 #define DEFAULT_TIMEOUT 0
@ -88,9 +91,17 @@ enum
{ {
ARG_0, ARG_0,
ARG_PROTOCOL, ARG_PROTOCOL,
ARG_BUFFERS_QUEUED,
ARG_BYTES_QUEUED,
ARG_TIME_QUEUED,
ARG_UNIT_TYPE,
ARG_UNITS_MAX,
ARG_UNITS_SOFT_MAX,
ARG_BUFFERS_MAX, ARG_BUFFERS_MAX,
ARG_BUFFERS_SOFT_MAX, ARG_BUFFERS_SOFT_MAX,
ARG_BUFFERS_QUEUED,
ARG_RECOVER_POLICY, ARG_RECOVER_POLICY,
ARG_TIMEOUT, ARG_TIMEOUT,
ARG_BYTES_TO_SERVE, ARG_BYTES_TO_SERVE,
@ -121,6 +132,45 @@ gst_recover_policy_get_type (void)
return recover_policy_type; return recover_policy_type;
} }
#define GST_TYPE_UNIT_TYPE (gst_unit_type_get_type())
static GType
gst_unit_type_get_type (void)
{
static GType unit_type_type = 0;
static GEnumValue unit_type[] = {
{GST_UNIT_TYPE_BUFFERS, "GST_UNIT_TYPE_BUFFERS", "Buffers"},
{GST_UNIT_TYPE_BYTES, "GST_UNIT_TYPE_BYTES", "Bytes"},
{GST_UNIT_TYPE_TIME, "GST_UNIT_TYPE_TIME", "Time"},
{0, NULL, NULL},
};
if (!unit_type_type) {
unit_type_type = g_enum_register_static ("GstTCPUnitType", unit_type);
}
return unit_type_type;
}
#define GST_TYPE_CLIENT_STATUS (gst_client_status_get_type())
static GType
gst_client_status_get_type (void)
{
static GType client_status_type = 0;
static GEnumValue client_status[] = {
{GST_CLIENT_STATUS_OK, "GST_CLIENT_STATUS_OK", "OK"},
{GST_CLIENT_STATUS_CLOSED, "GST_CLIENT_STATUS_CLOSED", "Closed"},
{GST_CLIENT_STATUS_REMOVED, "GST_CLIENT_STATUS_REMOVED", "Removed"},
{GST_CLIENT_STATUS_SLOW, "GST_CLIENT_STATUS_SLOW", "Too slow"},
{GST_CLIENT_STATUS_ERROR, "GST_CLIENT_STATUS_ERROR", "Error"},
{0, NULL, NULL},
};
if (!client_status_type) {
client_status_type =
g_enum_register_static ("GstTCPClientStatus", client_status);
}
return client_status_type;
}
static void gst_multifdsink_base_init (gpointer g_class); static void gst_multifdsink_base_init (gpointer g_class);
static void gst_multifdsink_class_init (GstMultiFdSinkClass * klass); static void gst_multifdsink_class_init (GstMultiFdSinkClass * klass);
static void gst_multifdsink_init (GstMultiFdSink * multifdsink); static void gst_multifdsink_init (GstMultiFdSink * multifdsink);
@ -191,6 +241,7 @@ gst_multifdsink_class_init (GstMultiFdSinkClass * klass)
g_object_class_install_property (gobject_class, ARG_PROTOCOL, g_object_class_install_property (gobject_class, ARG_PROTOCOL,
g_param_spec_enum ("protocol", "Protocol", "The protocol to wrap data in", g_param_spec_enum ("protocol", "Protocol", "The protocol to wrap data in",
GST_TYPE_TCP_PROTOCOL_TYPE, DEFAULT_PROTOCOL, G_PARAM_READWRITE)); GST_TYPE_TCP_PROTOCOL_TYPE, DEFAULT_PROTOCOL, G_PARAM_READWRITE));
g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BUFFERS_MAX, g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BUFFERS_MAX,
g_param_spec_int ("buffers-max", "Buffers max", g_param_spec_int ("buffers-max", "Buffers max",
"max number of buffers to queue (-1 = no limit)", -1, G_MAXINT, "max number of buffers to queue (-1 = no limit)", -1, G_MAXINT,
@ -199,10 +250,33 @@ gst_multifdsink_class_init (GstMultiFdSinkClass * klass)
g_param_spec_int ("buffers-soft-max", "Buffers soft max", g_param_spec_int ("buffers-soft-max", "Buffers soft max",
"Recover client when going over this limit (-1 = no limit)", -1, "Recover client when going over this limit (-1 = no limit)", -1,
G_MAXINT, DEFAULT_BUFFERS_SOFT_MAX, G_PARAM_READWRITE)); G_MAXINT, DEFAULT_BUFFERS_SOFT_MAX, G_PARAM_READWRITE));
g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_UNIT_TYPE,
g_param_spec_enum ("unit-type", "Units type",
"The unit to measure the max/soft-max/queued properties",
GST_TYPE_UNIT_TYPE, DEFAULT_UNIT_TYPE, G_PARAM_READWRITE));
g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_UNITS_MAX,
g_param_spec_int ("units-max", "Units max",
"max number of units to queue (-1 = no limit)", -1, G_MAXINT,
DEFAULT_UNITS_MAX, G_PARAM_READWRITE));
g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_UNITS_SOFT_MAX,
g_param_spec_int ("units-soft-max", "Units soft max",
"Recover client when going over this limit (-1 = no limit)", -1,
G_MAXINT, DEFAULT_UNITS_SOFT_MAX, G_PARAM_READWRITE));
g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BUFFERS_QUEUED, g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BUFFERS_QUEUED,
g_param_spec_int ("buffers-queued", "Buffers queued", g_param_spec_uint ("buffers-queued", "Buffers queued",
"Number of buffers currently queued", 0, G_MAXINT, 0, "Number of buffers currently queued", 0, G_MAXUINT, 0,
G_PARAM_READABLE)); G_PARAM_READABLE));
g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BYTES_QUEUED,
g_param_spec_uint ("bytes-queued", "Bytes queued",
"Number of bytes currently queued", 0, G_MAXUINT, 0,
G_PARAM_READABLE));
g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_TIME_QUEUED,
g_param_spec_uint64 ("time-queued", "Time queued",
"Number of time currently queued", 0, G_MAXUINT64, 0,
G_PARAM_READABLE));
g_object_class_install_property (gobject_class, ARG_RECOVER_POLICY, g_object_class_install_property (gobject_class, ARG_RECOVER_POLICY,
g_param_spec_enum ("recover-policy", "Recover Policy", g_param_spec_enum ("recover-policy", "Recover Policy",
"How to recover when client reaches the soft max", "How to recover when client reaches the soft max",
@ -227,7 +301,7 @@ gst_multifdsink_class_init (GstMultiFdSinkClass * klass)
gst_multifdsink_signals[SIGNAL_REMOVE] = gst_multifdsink_signals[SIGNAL_REMOVE] =
g_signal_new ("remove", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, g_signal_new ("remove", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
G_STRUCT_OFFSET (GstMultiFdSinkClass, remove), G_STRUCT_OFFSET (GstMultiFdSinkClass, remove),
NULL, NULL, g_cclosure_marshal_VOID__INT, G_TYPE_NONE, 1, G_TYPE_INT); NULL, NULL, gst_tcp_marshal_VOID__INT, G_TYPE_NONE, 1, G_TYPE_INT);
gst_multifdsink_signals[SIGNAL_CLEAR] = gst_multifdsink_signals[SIGNAL_CLEAR] =
g_signal_new ("clear", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, g_signal_new ("clear", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
G_STRUCT_OFFSET (GstMultiFdSinkClass, clear), G_STRUCT_OFFSET (GstMultiFdSinkClass, clear),
@ -245,8 +319,8 @@ gst_multifdsink_class_init (GstMultiFdSinkClass * klass)
gst_multifdsink_signals[SIGNAL_CLIENT_REMOVED] = gst_multifdsink_signals[SIGNAL_CLIENT_REMOVED] =
g_signal_new ("client-removed", G_TYPE_FROM_CLASS (klass), g_signal_new ("client-removed", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstMultiFdSinkClass, G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstMultiFdSinkClass,
client_removed), NULL, NULL, gst_tcp_marshal_VOID__INT, client_removed), NULL, NULL, gst_tcp_marshal_VOID__INT_BOXED,
G_TYPE_NONE, 1, G_TYPE_INT); G_TYPE_NONE, 2, G_TYPE_INT, GST_TYPE_CLIENT_STATUS);
gobject_class->set_property = gst_multifdsink_set_property; gobject_class->set_property = gst_multifdsink_set_property;
gobject_class->get_property = gst_multifdsink_get_property; gobject_class->get_property = gst_multifdsink_get_property;
@ -277,8 +351,9 @@ gst_multifdsink_init (GstMultiFdSink * this)
this->clients = NULL; this->clients = NULL;
this->bufqueue = g_array_new (FALSE, TRUE, sizeof (GstBuffer *)); this->bufqueue = g_array_new (FALSE, TRUE, sizeof (GstBuffer *));
this->buffers_max = DEFAULT_BUFFERS_MAX; this->unit_type = DEFAULT_UNIT_TYPE;
this->buffers_soft_max = DEFAULT_BUFFERS_SOFT_MAX; this->units_max = DEFAULT_UNITS_MAX;
this->units_soft_max = DEFAULT_UNITS_SOFT_MAX;
this->recover_policy = DEFAULT_RECOVER_POLICY; this->recover_policy = DEFAULT_RECOVER_POLICY;
this->timeout = DEFAULT_TIMEOUT; this->timeout = DEFAULT_TIMEOUT;
@ -501,7 +576,7 @@ gst_multifdsink_remove_client_link (GstMultiFdSink * sink, GList * link)
g_mutex_unlock (sink->clientslock); g_mutex_unlock (sink->clientslock);
g_signal_emit (G_OBJECT (sink), g_signal_emit (G_OBJECT (sink),
gst_multifdsink_signals[SIGNAL_CLIENT_REMOVED], 0, fd); gst_multifdsink_signals[SIGNAL_CLIENT_REMOVED], 0, fd, client->status);
/* lock again before we remove the client completely */ /* lock again before we remove the client completely */
g_mutex_lock (sink->clientslock); g_mutex_lock (sink->clientslock);
@ -734,8 +809,7 @@ gst_multifdsink_handle_client_write (GstMultiFdSink * sink,
/* client can pick a buffer from the global queue */ /* client can pick a buffer from the global queue */
GstBuffer *buf; GstBuffer *buf;
/* grab buffer and ref, we need to ref since it could be unreffed in /* grab buffer */
* another thread */
buf = g_array_index (sink->bufqueue, GstBuffer *, client->bufpos); buf = g_array_index (sink->bufqueue, GstBuffer *, client->bufpos);
client->bufpos--; client->bufpos--;
@ -809,12 +883,10 @@ gst_multifdsink_recover_client (GstMultiFdSink * sink, GstTCPClient * client)
{ {
gint newbufpos; gint newbufpos;
/* FIXME: implement recover procedure here, like moving the position to
* the next keyframe, dropping buffers back to the beginning of the queue,
* stuff like that... */
GST_WARNING_OBJECT (sink, GST_WARNING_OBJECT (sink,
"client %p with fd %d is lagging, recover using policy %d", client, "client %p with fd %d is lagging at %d, recover using policy %d", client,
client->fd, sink->recover_policy); client->fd, client->bufpos, sink->recover_policy);
switch (sink->recover_policy) { switch (sink->recover_policy) {
case GST_RECOVER_POLICY_NONE: case GST_RECOVER_POLICY_NONE:
/* do nothing, client will catch up or get kicked out when it reaches /* do nothing, client will catch up or get kicked out when it reaches
@ -827,14 +899,15 @@ gst_multifdsink_recover_client (GstMultiFdSink * sink, GstTCPClient * client)
break; break;
case GST_RECOVER_POLICY_RESYNC_SOFT: case GST_RECOVER_POLICY_RESYNC_SOFT:
/* move to beginning of soft max */ /* move to beginning of soft max */
newbufpos = sink->buffers_soft_max; newbufpos = sink->units_soft_max;
break; break;
case GST_RECOVER_POLICY_RESYNC_KEYFRAME: case GST_RECOVER_POLICY_RESYNC_KEYFRAME:
/* FIXME, find keyframe in buffers */ /* FIXME, find keyframe in buffers */
newbufpos = sink->buffers_soft_max; newbufpos = sink->units_soft_max;
break; break;
default: default:
newbufpos = sink->buffers_soft_max; /* unknown recovery procedure */
newbufpos = sink->units_soft_max;
break; break;
} }
return newbufpos; return newbufpos;
@ -890,7 +963,7 @@ gst_multifdsink_queue_buffer (GstMultiFdSink * sink, GstBuffer * buf)
GST_LOG_OBJECT (sink, "client %p with fd %d at position %d", GST_LOG_OBJECT (sink, "client %p with fd %d at position %d",
client, client->fd, client->bufpos); client, client->fd, client->bufpos);
/* check soft max if needed, recover client */ /* check soft max if needed, recover client */
if (sink->buffers_soft_max > 0 && client->bufpos >= sink->buffers_soft_max) { if (sink->units_soft_max > 0 && client->bufpos >= sink->units_soft_max) {
gint newpos; gint newpos;
newpos = gst_multifdsink_recover_client (sink, client); newpos = gst_multifdsink_recover_client (sink, client);
@ -905,7 +978,7 @@ gst_multifdsink_queue_buffer (GstMultiFdSink * sink, GstBuffer * buf)
} }
} }
/* check hard max and timeout, remove client */ /* check hard max and timeout, remove client */
if ((sink->buffers_max > 0 && client->bufpos >= sink->buffers_max) || if ((sink->units_max > 0 && client->bufpos >= sink->units_max) ||
(sink->timeout > 0 (sink->timeout > 0
&& now - client->last_activity_time > sink->timeout)) { && now - client->last_activity_time > sink->timeout)) {
/* remove client */ /* remove client */
@ -931,7 +1004,7 @@ gst_multifdsink_queue_buffer (GstMultiFdSink * sink, GstBuffer * buf)
max_buffer_usage = client->bufpos; max_buffer_usage = client->bufpos;
} }
} }
/* nobody is referencing buffers after max_buffer_usage so we can /* nobody is referencing units after max_buffer_usage so we can
* remove them from the queue */ * remove them from the queue */
for (i = queuelen - 1; i > max_buffer_usage; i--) { for (i = queuelen - 1; i > max_buffer_usage; i--) {
GstBuffer *old; GstBuffer *old;
@ -1171,10 +1244,19 @@ gst_multifdsink_set_property (GObject * object, guint prop_id,
multifdsink->protocol = g_value_get_enum (value); multifdsink->protocol = g_value_get_enum (value);
break; break;
case ARG_BUFFERS_MAX: case ARG_BUFFERS_MAX:
multifdsink->buffers_max = g_value_get_int (value); multifdsink->units_max = g_value_get_int (value);
break; break;
case ARG_BUFFERS_SOFT_MAX: case ARG_BUFFERS_SOFT_MAX:
multifdsink->buffers_soft_max = g_value_get_int (value); multifdsink->units_soft_max = g_value_get_int (value);
break;
case ARG_UNIT_TYPE:
multifdsink->unit_type = g_value_get_enum (value);
break;
case ARG_UNITS_MAX:
multifdsink->units_max = g_value_get_int (value);
break;
case ARG_UNITS_SOFT_MAX:
multifdsink->units_soft_max = g_value_get_int (value);
break; break;
case ARG_RECOVER_POLICY: case ARG_RECOVER_POLICY:
multifdsink->recover_policy = g_value_get_enum (value); multifdsink->recover_policy = g_value_get_enum (value);
@ -1203,13 +1285,28 @@ gst_multifdsink_get_property (GObject * object, guint prop_id, GValue * value,
g_value_set_enum (value, multifdsink->protocol); g_value_set_enum (value, multifdsink->protocol);
break; break;
case ARG_BUFFERS_MAX: case ARG_BUFFERS_MAX:
g_value_set_int (value, multifdsink->buffers_max); g_value_set_int (value, multifdsink->units_max);
break; break;
case ARG_BUFFERS_SOFT_MAX: case ARG_BUFFERS_SOFT_MAX:
g_value_set_int (value, multifdsink->buffers_soft_max); g_value_set_int (value, multifdsink->units_soft_max);
break; break;
case ARG_BUFFERS_QUEUED: case ARG_BUFFERS_QUEUED:
g_value_set_int (value, multifdsink->buffers_queued); g_value_set_uint (value, multifdsink->buffers_queued);
break;
case ARG_BYTES_QUEUED:
g_value_set_uint (value, multifdsink->bytes_queued);
break;
case ARG_TIME_QUEUED:
g_value_set_uint64 (value, multifdsink->time_queued);
break;
case ARG_UNIT_TYPE:
g_value_set_enum (value, multifdsink->unit_type);
break;
case ARG_UNITS_MAX:
g_value_set_int (value, multifdsink->units_max);
break;
case ARG_UNITS_SOFT_MAX:
g_value_set_int (value, multifdsink->units_soft_max);
break; break;
case ARG_RECOVER_POLICY: case ARG_RECOVER_POLICY:
g_value_set_enum (value, multifdsink->recover_policy); g_value_set_enum (value, multifdsink->recover_policy);

View file

@ -77,11 +77,18 @@ typedef enum
typedef enum typedef enum
{ {
GST_CLIENT_STATUS_OK, GST_UNIT_TYPE_BUFFERS,
GST_CLIENT_STATUS_CLOSED, GST_UNIT_TYPE_TIME,
GST_CLIENT_STATUS_REMOVED, GST_UNIT_TYPE_BYTES,
GST_CLIENT_STATUS_SLOW, } GstUnitType;
GST_CLIENT_STATUS_ERROR,
typedef enum
{
GST_CLIENT_STATUS_OK = 0,
GST_CLIENT_STATUS_CLOSED = 1,
GST_CLIENT_STATUS_REMOVED = 2,
GST_CLIENT_STATUS_SLOW = 3,
GST_CLIENT_STATUS_ERROR = 4,
} GstClientStatus; } GstClientStatus;
/* structure for a client /* structure for a client
@ -138,12 +145,16 @@ struct _GstMultiFdSink {
gboolean running; /* the thread state */ gboolean running; /* the thread state */
GThread *thread; /* the sender thread */ GThread *thread; /* the sender thread */
gint buffers_max; /* max buffers to queue */ GstUnitType unit_type;/* the type of the units */
gint buffers_soft_max; /* max buffers a client can lag before recovery starts */ gint units_max; /* max units to queue */
gint units_soft_max; /* max units a client can lag before recovery starts */
GstRecoverPolicy recover_policy; GstRecoverPolicy recover_policy;
GstClockTime timeout; /* max amount of nanoseconds to remain idle */ GstClockTime timeout; /* max amount of nanoseconds to remain idle */
/* stats */ /* stats */
gint buffers_queued; /* number of queued buffers */ gint buffers_queued; /* number of queued buffers */
gint bytes_queued; /* number of queued bytes */
gint time_queued; /* number of queued time */
}; };
struct _GstMultiFdSinkClass { struct _GstMultiFdSinkClass {

View file

@ -1,3 +1,4 @@
VOID:STRING,UINT VOID:STRING,UINT
VOID:INT VOID:INT
VOID:INT,BOXED
BOXED:INT BOXED:INT