multihandlesink: further refactoring

This commit is contained in:
Thomas Vander Stichele 2012-01-27 15:46:31 +01:00
parent 99185cc8f5
commit 020739664a
8 changed files with 285 additions and 843 deletions

View file

@ -125,11 +125,6 @@
#define NOT_IMPLEMENTED 0
static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
GST_PAD_SINK,
GST_PAD_ALWAYS,
GST_STATIC_CAPS_ANY);
GST_DEBUG_CATEGORY_STATIC (multifdsink_debug);
#define GST_CAT_DEFAULT (multifdsink_debug)
@ -154,16 +149,11 @@ enum
/* this is really arbitrarily chosen */
#define DEFAULT_MODE 1
#define DEFAULT_BUFFERS_MAX -1
#define DEFAULT_BUFFERS_SOFT_MAX -1
#define DEFAULT_UNIT_TYPE GST_TCP_UNIT_TYPE_BUFFERS
#define DEFAULT_UNITS_MAX -1
#define DEFAULT_UNITS_SOFT_MAX -1
#define DEFAULT_UNIT_FORMAT GST_FORMAT_BUFFERS
#define DEFAULT_BURST_UNIT GST_TCP_UNIT_TYPE_UNDEFINED
#define DEFAULT_BURST_FORMAT GST_FORMAT_UNDEFINED
#define DEFAULT_BURST_VALUE 0
#define DEFAULT_QOS_DSCP -1
#define DEFAULT_HANDLE_READ TRUE
enum
@ -171,18 +161,11 @@ enum
PROP_0,
PROP_MODE,
PROP_UNIT_TYPE,
PROP_UNITS_MAX,
PROP_UNITS_SOFT_MAX,
PROP_UNIT_FORMAT,
PROP_BUFFERS_MAX,
PROP_BUFFERS_SOFT_MAX,
PROP_BURST_UNIT,
PROP_BURST_FORMAT,
PROP_BURST_VALUE,
PROP_QOS_DSCP,
PROP_HANDLE_READ,
PROP_NUM_FDS,
@ -210,23 +193,23 @@ gst_fdset_mode_get_type (void)
return fdset_mode_type;
}
#define GST_TYPE_UNIT_TYPE (gst_unit_type_get_type())
#define GST_TYPE_UNIT_FORMAT (gst_unit_format_get_type())
static GType
gst_unit_type_get_type (void)
gst_unit_format_get_type (void)
{
static GType unit_type_type = 0;
static const GEnumValue unit_type[] = {
{GST_TCP_UNIT_TYPE_UNDEFINED, "Undefined", "undefined"},
{GST_TCP_UNIT_TYPE_BUFFERS, "Buffers", "buffers"},
{GST_TCP_UNIT_TYPE_BYTES, "Bytes", "bytes"},
{GST_TCP_UNIT_TYPE_TIME, "Time", "time"},
static GType unit_format_type = 0;
static const GEnumValue unit_format[] = {
{GST_TCP_UNIT_FORMAT_UNDEFINED, "Undefined", "undefined"},
{GST_TCP_UNIT_FORMAT_BYTES, "Bytes", "bytes"},
{GST_TCP_UNIT_FORMAT_TIME, "Time", "time"},
{GST_TCP_UNIT_FORMAT_BUFFERS, "Buffers", "buffers"},
{0, NULL, NULL},
};
if (!unit_type_type) {
unit_type_type = g_enum_register_static ("GstTCPUnitType", unit_type);
if (!unit_format_type) {
unit_format_type = g_enum_register_static ("GstTCPUnitType", unit_format);
}
return unit_type_type;
return unit_format_type;
}
static void gst_multi_fd_sink_finalize (GObject * object);
@ -236,13 +219,15 @@ static void gst_multi_fd_sink_stop_pre (GstMultiHandleSink * mhsink);
static void gst_multi_fd_sink_stop_post (GstMultiHandleSink * mhsink);
static gboolean gst_multi_fd_sink_start_pre (GstMultiHandleSink * mhsink);
static gpointer gst_multi_fd_sink_thread (GstMultiHandleSink * mhsink);
static void gst_multi_fd_sink_queue_buffer (GstMultiHandleSink * mhsink,
GstBuffer * buffer);
static gboolean gst_multi_fd_sink_client_queue_buffer (GstMultiHandleSink *
mhsink, GstMultiHandleClient * mhclient, GstBuffer * buffer);
static int gst_multi_fd_sink_client_get_fd (GstMultiHandleClient * client);
static void gst_multi_fd_sink_remove_client_link (GstMultiHandleSink * sink,
GList * link);
static GstFlowReturn gst_multi_fd_sink_render (GstBaseSink * bsink,
GstBuffer * buf);
static void gst_multi_fd_sink_set_property (GObject * object, guint prop_id,
const GValue * value, GParamSpec * pspec);
static void gst_multi_fd_sink_get_property (GObject * object, guint prop_id,
@ -258,12 +243,10 @@ gst_multi_fd_sink_class_init (GstMultiFdSinkClass * klass)
{
GObjectClass *gobject_class;
GstElementClass *gstelement_class;
GstBaseSinkClass *gstbasesink_class;
GstMultiHandleSinkClass *gstmultihandlesink_class;
gobject_class = (GObjectClass *) klass;
gstelement_class = (GstElementClass *) klass;
gstbasesink_class = (GstBaseSinkClass *) klass;
gstmultihandlesink_class = (GstMultiHandleSinkClass *) klass;
gobject_class->set_property = gst_multi_fd_sink_set_property;
@ -284,47 +267,22 @@ gst_multi_fd_sink_class_init (GstMultiFdSinkClass * klass)
GST_TYPE_FDSET_MODE, DEFAULT_MODE,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_BUFFERS_MAX,
g_param_spec_int ("buffers-max", "Buffers max",
"max number of buffers to queue for a client (-1 = no limit)", -1,
G_MAXINT, DEFAULT_BUFFERS_MAX,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_BUFFERS_SOFT_MAX,
g_param_spec_int ("buffers-soft-max", "Buffers soft max",
"Recover client when going over this limit (-1 = no limit)", -1,
G_MAXINT, DEFAULT_BUFFERS_SOFT_MAX,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_UNIT_TYPE,
g_param_spec_enum ("unit-type", "Units type",
g_object_class_install_property (gobject_class, PROP_UNIT_FORMAT,
g_param_spec_enum ("unit-format", "Units format",
"The unit to measure the max/soft-max/queued properties",
GST_TYPE_UNIT_TYPE, DEFAULT_UNIT_TYPE,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_UNITS_MAX,
g_param_spec_int64 ("units-max", "Units max",
"max number of units to queue (-1 = no limit)", -1, G_MAXINT64,
DEFAULT_UNITS_MAX, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_UNITS_SOFT_MAX,
g_param_spec_int64 ("units-soft-max", "Units soft max",
"Recover client when going over this limit (-1 = no limit)", -1,
G_MAXINT64, DEFAULT_UNITS_SOFT_MAX,
GST_TYPE_FORMAT, DEFAULT_UNIT_FORMAT,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_BURST_UNIT,
g_param_spec_enum ("burst-unit", "Burst unit",
g_object_class_install_property (gobject_class, PROP_BURST_FORMAT,
g_param_spec_enum ("burst-format", "Burst format",
"The format of the burst units (when sync-method is burst[[-with]-keyframe])",
GST_TYPE_UNIT_TYPE, DEFAULT_BURST_UNIT,
GST_TYPE_FORMAT, DEFAULT_BURST_FORMAT,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_BURST_VALUE,
g_param_spec_uint64 ("burst-value", "Burst value",
"The amount of burst expressed in burst-unit", 0, G_MAXUINT64,
DEFAULT_BURST_VALUE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_QOS_DSCP,
g_param_spec_int ("qos-dscp", "QoS diff srv code point",
"Quality of Service, differentiated services code point (-1 default)",
-1, 63, DEFAULT_QOS_DSCP,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/**
* GstMultiFdSink::handle-read
*
@ -359,12 +317,12 @@ gst_multi_fd_sink_class_init (GstMultiFdSinkClass * klass)
* @gstmultifdsink: the multifdsink element to emit this signal on
* @fd: the file descriptor to add to multifdsink
* @sync: the sync method to use
* @unit_type_min: the unit-type of @value_min
* @unit_format_min: the unit-format of @value_min
* @value_min: the minimum amount of data to burst expressed in
* @unit_type_min units.
* @unit_type_max: the unit-type of @value_max
* @unit_format_min units.
* @unit_format_max: the unit-format of @value_max
* @value_max: the maximum amount of data to burst expressed in
* @unit_type_max units.
* @unit_format_max units.
*
* Hand the given open file descriptor to multifdsink to write to and
* specify the burst parameters for the new connection.
@ -374,8 +332,8 @@ gst_multi_fd_sink_class_init (GstMultiFdSinkClass * klass)
G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstMultiFdSinkClass,
add_full), NULL, NULL,
gst_tcp_marshal_VOID__INT_ENUM_INT_UINT64_INT_UINT64,
G_TYPE_NONE, 6, G_TYPE_INT, GST_TYPE_SYNC_METHOD, GST_TYPE_UNIT_TYPE,
G_TYPE_UINT64, GST_TYPE_UNIT_TYPE, G_TYPE_UINT64);
G_TYPE_NONE, 6, G_TYPE_INT, GST_TYPE_SYNC_METHOD, GST_TYPE_UNIT_FORMAT,
G_TYPE_UINT64, GST_TYPE_UNIT_FORMAT, G_TYPE_UINT64);
/**
* GstMultiFdSink::remove:
* @gstmultifdsink: the multifdsink element to emit this signal on
@ -477,17 +435,12 @@ gst_multi_fd_sink_class_init (GstMultiFdSinkClass * klass)
client_fd_removed), NULL, NULL, gst_tcp_marshal_VOID__INT,
G_TYPE_NONE, 1, G_TYPE_INT);
gst_element_class_add_pad_template (gstelement_class,
gst_static_pad_template_get (&sinktemplate));
gst_element_class_set_details_simple (gstelement_class,
"Multi filedescriptor sink", "Sink/Network",
"Send data to multiple filedescriptors",
"Thomas Vander Stichele <thomas at apestaart dot org>, "
"Wim Taymans <wim@fluendo.com>");
gstbasesink_class->render = GST_DEBUG_FUNCPTR (gst_multi_fd_sink_render);
gstmultihandlesink_class->clear_post =
GST_DEBUG_FUNCPTR (gst_multi_fd_sink_clear_post);
@ -499,6 +452,12 @@ gst_multi_fd_sink_class_init (GstMultiFdSinkClass * klass)
GST_DEBUG_FUNCPTR (gst_multi_fd_sink_start_pre);
gstmultihandlesink_class->thread =
GST_DEBUG_FUNCPTR (gst_multi_fd_sink_thread);
gstmultihandlesink_class->queue_buffer =
GST_DEBUG_FUNCPTR (gst_multi_fd_sink_queue_buffer);
gstmultihandlesink_class->client_queue_buffer =
GST_DEBUG_FUNCPTR (gst_multi_fd_sink_client_queue_buffer);
gstmultihandlesink_class->client_get_fd =
GST_DEBUG_FUNCPTR (gst_multi_fd_sink_client_get_fd);
gstmultihandlesink_class->remove_client_link =
GST_DEBUG_FUNCPTR (gst_multi_fd_sink_remove_client_link);
@ -520,17 +479,12 @@ gst_multi_fd_sink_init (GstMultiFdSink * this)
this->fd_hash = g_hash_table_new (g_int_hash, g_int_equal);
this->unit_type = DEFAULT_UNIT_TYPE;
this->units_max = DEFAULT_UNITS_MAX;
this->units_soft_max = DEFAULT_UNITS_SOFT_MAX;
this->unit_format = DEFAULT_UNIT_FORMAT;
this->def_burst_unit = DEFAULT_BURST_UNIT;
this->def_burst_format = DEFAULT_BURST_FORMAT;
this->def_burst_value = DEFAULT_BURST_VALUE;
this->qos_dscp = DEFAULT_QOS_DSCP;
this->handle_read = DEFAULT_HANDLE_READ;
this->header_flags = 0;
}
static void
@ -543,90 +497,19 @@ gst_multi_fd_sink_finalize (GObject * object)
G_OBJECT_CLASS (parent_class)->finalize (object);
}
static gint
setup_dscp_client (GstMultiFdSink * sink, GstTCPClient * client)
static int
gst_multi_fd_sink_client_get_fd (GstMultiHandleClient * client)
{
gint tos;
gint ret;
union gst_sockaddr
{
struct sockaddr sa;
struct sockaddr_in6 sa_in6;
struct sockaddr_storage sa_stor;
} sa;
socklen_t slen = sizeof (sa);
gint af;
GstTCPClient *tclient = (GstTCPClient *) client;
/* don't touch */
if (sink->qos_dscp < 0)
return 0;
if ((ret = getsockname (client->fd.fd, &sa.sa, &slen)) < 0) {
GST_DEBUG_OBJECT (sink, "could not get sockname: %s", g_strerror (errno));
return ret;
}
af = sa.sa.sa_family;
/* if this is an IPv4-mapped address then do IPv4 QoS */
if (af == AF_INET6) {
GST_DEBUG_OBJECT (sink, "check IP6 socket");
if (IN6_IS_ADDR_V4MAPPED (&(sa.sa_in6.sin6_addr))) {
GST_DEBUG_OBJECT (sink, "mapped to IPV4");
af = AF_INET;
}
}
/* extract and shift 6 bits of the DSCP */
tos = (sink->qos_dscp & 0x3f) << 2;
switch (af) {
case AF_INET:
ret = setsockopt (client->fd.fd, IPPROTO_IP, IP_TOS, &tos, sizeof (tos));
break;
case AF_INET6:
#ifdef IPV6_TCLASS
ret =
setsockopt (client->fd.fd, IPPROTO_IPV6, IPV6_TCLASS, &tos,
sizeof (tos));
break;
#endif
default:
ret = 0;
GST_ERROR_OBJECT (sink, "unsupported AF");
break;
}
if (ret)
GST_DEBUG_OBJECT (sink, "could not set DSCP: %s", g_strerror (errno));
return ret;
}
static void
setup_dscp (GstMultiFdSink * sink)
{
GList *clients, *next;
GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
CLIENTS_LOCK (sink);
for (clients = mhsink->clients; clients; clients = next) {
GstTCPClient *client;
client = (GstTCPClient *) clients->data;
next = g_list_next (clients);
setup_dscp_client (sink, client);
}
CLIENTS_UNLOCK (sink);
return tclient->fd.fd;
}
/* "add-full" signal implementation */
void
gst_multi_fd_sink_add_full (GstMultiFdSink * sink, int fd,
GstSyncMethod sync_method, GstTCPUnitType min_unit, guint64 min_value,
GstTCPUnitType max_unit, guint64 max_value)
GstSyncMethod sync_method, GstFormat min_format, guint64 min_value,
GstFormat max_format, guint64 max_value)
{
GstTCPClient *client;
GstMultiHandleClient *mhclient;
@ -636,12 +519,12 @@ gst_multi_fd_sink_add_full (GstMultiFdSink * sink, int fd,
GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
GST_DEBUG_OBJECT (sink, "[fd %5d] adding client, sync_method %d, "
"min_unit %d, min_value %" G_GUINT64_FORMAT
", max_unit %d, max_value %" G_GUINT64_FORMAT, fd, sync_method,
min_unit, min_value, max_unit, max_value);
"min_format %d, min_value %" G_GUINT64_FORMAT
", max_format %d, max_value %" G_GUINT64_FORMAT, fd, sync_method,
min_format, min_value, max_format, max_value);
/* do limits check if we can */
if (min_unit == max_unit) {
if (min_format == max_format) {
if (max_value != -1 && min_value != -1 && max_value < min_value)
goto wrong_limits;
}
@ -653,9 +536,9 @@ gst_multi_fd_sink_add_full (GstMultiFdSink * sink, int fd,
g_snprintf (mhclient->debug, 30, "[fd %5d]", fd);
client->fd.fd = fd;
client->burst_min_unit = min_unit;
client->burst_min_format = min_format;
client->burst_min_value = min_value;
client->burst_max_unit = max_unit;
client->burst_max_format = max_format;
client->burst_max_value = max_value;
CLIENTS_LOCK (sink);
@ -689,7 +572,7 @@ gst_multi_fd_sink_add_full (GstMultiFdSink * sink, int fd,
/* figure out the mode, can't use send() for non sockets */
if (fstat (fd, &statbuf) == 0 && S_ISSOCK (statbuf.st_mode)) {
client->is_socket = TRUE;
setup_dscp_client (sink, client);
gst_multi_handle_sink_setup_dscp_client (mhsink, mhclient);
}
gst_poll_restart (sink->fdset);
@ -707,7 +590,7 @@ wrong_limits:
GST_WARNING_OBJECT (sink,
"[fd %5d] wrong values min =%" G_GUINT64_FORMAT ", max=%"
G_GUINT64_FORMAT ", unit %d specified when adding client", fd,
min_value, max_value, min_unit);
min_value, max_value, min_format);
return;
}
duplicate:
@ -731,7 +614,8 @@ gst_multi_fd_sink_add (GstMultiFdSink * sink, int fd)
mhsink = GST_MULTI_HANDLE_SINK (sink);
gst_multi_fd_sink_add_full (sink, fd, mhsink->def_sync_method,
sink->def_burst_unit, sink->def_burst_value, sink->def_burst_unit, -1);
sink->def_burst_format, sink->def_burst_value, sink->def_burst_format,
-1);
}
/* "remove" signal implementation */
@ -1086,16 +970,16 @@ ioctl_failed:
/* queue the given buffer for the given client */
static gboolean
gst_multi_fd_sink_client_queue_buffer (GstMultiFdSink * sink,
GstTCPClient * client, GstBuffer * buffer)
gst_multi_fd_sink_client_queue_buffer (GstMultiHandleSink * mhsink,
GstMultiHandleClient * mhclient, GstBuffer * buffer)
{
GstCaps *caps;
/* TRUE: send them if the new caps have them */
gboolean send_streamheader = FALSE;
GstStructure *s;
GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
GstMultiFdSink *sink = GST_MULTI_FD_SINK (mhsink);
GstTCPClient *client = (GstTCPClient *) mhclient;
/* before we queue the buffer, we check if we need to queue streamheader
* buffers (because it's a new client, or because they changed) */
@ -1212,10 +1096,10 @@ get_buffers_max (GstMultiFdSink * sink, gint64 max)
{
GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
switch (sink->unit_type) {
case GST_TCP_UNIT_TYPE_BUFFERS:
switch (sink->unit_format) {
case GST_TCP_UNIT_FORMAT_BUFFERS:
return max;
case GST_TCP_UNIT_TYPE_TIME:
case GST_TCP_UNIT_FORMAT_TIME:
{
GstBuffer *buf;
int i;
@ -1239,7 +1123,7 @@ get_buffers_max (GstMultiFdSink * sink, gint64 max)
}
return len + 1;
}
case GST_TCP_UNIT_TYPE_BYTES:
case GST_TCP_UNIT_FORMAT_BYTES:
{
GstBuffer *buf;
int i;
@ -1383,23 +1267,23 @@ find_limits (GstMultiFdSink * sink,
* Returns: FALSE if the unit is unknown or undefined. TRUE otherwise.
*/
static gboolean
assign_value (GstTCPUnitType unit, guint64 value, gint * bytes, gint * buffers,
assign_value (GstFormat format, guint64 value, gint * bytes, gint * buffers,
GstClockTime * time)
{
gboolean res = TRUE;
/* set only the limit of the given format to the given value */
switch (unit) {
case GST_TCP_UNIT_TYPE_BUFFERS:
switch (format) {
case GST_TCP_UNIT_FORMAT_BUFFERS:
*buffers = (gint) value;
break;
case GST_TCP_UNIT_TYPE_TIME:
case GST_TCP_UNIT_FORMAT_TIME:
*time = value;
break;
case GST_TCP_UNIT_TYPE_BYTES:
case GST_TCP_UNIT_FORMAT_BYTES:
*bytes = (gint) value;
break;
case GST_TCP_UNIT_TYPE_UNDEFINED:
case GST_TCP_UNIT_FORMAT_UNDEFINED:
default:
res = FALSE;
break;
@ -1416,16 +1300,16 @@ assign_value (GstTCPUnitType unit, guint64 value, gint * bytes, gint * buffers,
* function returns FALSE.
*/
static gboolean
count_burst_unit (GstMultiFdSink * sink, gint * min_idx,
GstTCPUnitType min_unit, guint64 min_value, gint * max_idx,
GstTCPUnitType max_unit, guint64 max_value)
count_burst_format (GstMultiFdSink * sink, gint * min_idx,
GstFormat min_format, guint64 min_value, gint * max_idx,
GstFormat max_format, guint64 max_value)
{
gint bytes_min = -1, buffers_min = -1;
gint bytes_max = -1, buffers_max = -1;
GstClockTime time_min = GST_CLOCK_TIME_NONE, time_max = GST_CLOCK_TIME_NONE;
assign_value (min_unit, min_value, &bytes_min, &buffers_min, &time_min);
assign_value (max_unit, max_value, &bytes_max, &buffers_max, &time_max);
assign_value (min_format, min_value, &bytes_min, &buffers_min, &time_min);
assign_value (max_format, max_value, &bytes_max, &buffers_max, &time_max);
return find_limits (sink, min_idx, bytes_min, buffers_min, time_min,
max_idx, bytes_max, buffers_max, time_max);
@ -1518,11 +1402,11 @@ gst_multi_fd_sink_new_client (GstMultiFdSink * sink, GstTCPClient * client)
* is not enough data, we just send what we have (which is in result).
* We use the max value to limit the search
*/
ok = count_burst_unit (sink, &result, client->burst_min_unit,
client->burst_min_value, &max, client->burst_max_unit,
ok = count_burst_format (sink, &result, client->burst_min_format,
client->burst_min_value, &max, client->burst_max_format,
client->burst_max_value);
GST_DEBUG_OBJECT (sink,
"[fd %5d] SYNC_METHOD_BURST: burst_unit returned %d, result %d",
"[fd %5d] SYNC_METHOD_BURST: burst_format returned %d, result %d",
client->fd.fd, ok, result);
GST_LOG_OBJECT (sink, "min %d, max %d", result, max);
@ -1549,8 +1433,8 @@ gst_multi_fd_sink_new_client (GstMultiFdSink * sink, GstTCPClient * client)
* NEXT_KEYFRAME.
*/
/* gather burst limits */
count_burst_unit (sink, &min_idx, client->burst_min_unit,
client->burst_min_value, &max_idx, client->burst_max_unit,
count_burst_format (sink, &min_idx, client->burst_min_format,
client->burst_min_value, &max_idx, client->burst_max_format,
client->burst_max_value);
GST_LOG_OBJECT (sink, "min %d, max %d", min_idx, max_idx);
@ -1596,8 +1480,8 @@ gst_multi_fd_sink_new_client (GstMultiFdSink * sink, GstTCPClient * client)
* amount of data up 'till min.
*/
/* gather enough data to burst */
count_burst_unit (sink, &min_idx, client->burst_min_unit,
client->burst_min_value, &max_idx, client->burst_max_unit,
count_burst_format (sink, &min_idx, client->burst_min_format,
client->burst_min_value, &max_idx, client->burst_max_format,
client->burst_max_value);
GST_LOG_OBJECT (sink, "min %d, max %d", min_idx, max_idx);
@ -1662,10 +1546,11 @@ gst_multi_fd_sink_handle_client_write (GstMultiFdSink * sink,
gboolean flushing;
GstClockTime now;
GTimeVal nowtv;
GstMultiHandleSink *mhsink;
GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
GstMultiHandleSinkClass *mhsinkclass =
GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
mhsink = GST_MULTI_HANDLE_SINK (sink);
g_get_current_time (&nowtv);
now = GST_TIMEVAL_TO_TIME (nowtv);
@ -1731,7 +1616,7 @@ gst_multi_fd_sink_handle_client_write (GstMultiFdSink * sink,
fd, client, mhclient->bufpos);
/* queueing a buffer will ref it */
gst_multi_fd_sink_client_queue_buffer (sink, client, buf);
mhsinkclass->client_queue_buffer (mhsink, mhclient, buf);
/* need to start from the first byte for this new buffer */
mhclient->bufoffset = 0;
@ -1850,13 +1735,13 @@ gst_multi_fd_sink_recover_client (GstMultiFdSink * sink, GstTCPClient * client)
break;
case GST_RECOVER_POLICY_RESYNC_SOFT_LIMIT:
/* move to beginning of soft max */
newbufpos = get_buffers_max (sink, sink->units_soft_max);
newbufpos = get_buffers_max (sink, mhsink->units_soft_max);
break;
case GST_RECOVER_POLICY_RESYNC_KEYFRAME:
/* find keyframe in buffers, we search backwards to find the
* closest keyframe relative to what this client already received. */
newbufpos = MIN (mhsink->bufqueue->len - 1,
get_buffers_max (sink, sink->units_soft_max) - 1);
get_buffers_max (sink, mhsink->units_soft_max) - 1);
while (newbufpos >= 0) {
GstBuffer *buf;
@ -1871,7 +1756,7 @@ gst_multi_fd_sink_recover_client (GstMultiFdSink * sink, GstTCPClient * client)
break;
default:
/* unknown recovery procedure */
newbufpos = get_buffers_max (sink, sink->units_soft_max);
newbufpos = get_buffers_max (sink, mhsink->units_soft_max);
break;
}
return newbufpos;
@ -1896,7 +1781,7 @@ gst_multi_fd_sink_recover_client (GstMultiFdSink * sink, GstTCPClient * client)
* the select thread that the fd_set changed.
*/
static void
gst_multi_fd_sink_queue_buffer (GstMultiFdSink * sink, GstBuffer * buf)
gst_multi_fd_sink_queue_buffer (GstMultiHandleSink * mhsink, GstBuffer * buffer)
{
GList *clients, *next;
gint queuelen;
@ -1907,25 +1792,25 @@ gst_multi_fd_sink_queue_buffer (GstMultiFdSink * sink, GstBuffer * buf)
GstClockTime now;
gint max_buffers, soft_max_buffers;
guint cookie;
GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
GstMultiFdSink *sink = GST_MULTI_FD_SINK (mhsink);
GstMultiHandleSinkClass *mhsinkclass =
GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
g_get_current_time (&nowtv);
now = GST_TIMEVAL_TO_TIME (nowtv);
CLIENTS_LOCK (sink);
CLIENTS_LOCK (mhsink);
/* add buffer to queue */
g_array_prepend_val (mhsink->bufqueue, buf);
g_array_prepend_val (mhsink->bufqueue, buffer);
queuelen = mhsink->bufqueue->len;
if (sink->units_max > 0)
max_buffers = get_buffers_max (sink, sink->units_max);
if (mhsink->units_max > 0)
max_buffers = get_buffers_max (sink, mhsink->units_max);
else
max_buffers = -1;
if (sink->units_soft_max > 0)
soft_max_buffers = get_buffers_max (sink, sink->units_soft_max);
if (mhsink->units_soft_max > 0)
soft_max_buffers = get_buffers_max (sink, mhsink->units_soft_max);
else
soft_max_buffers = -1;
GST_LOG_OBJECT (sink, "Using max %d, softmax %d", max_buffers,
@ -2260,116 +2145,6 @@ gst_multi_fd_sink_thread (GstMultiHandleSink * mhsink)
return NULL;
}
static GstFlowReturn
gst_multi_fd_sink_render (GstBaseSink * bsink, GstBuffer * buf)
{
GstMultiFdSink *sink;
gboolean in_caps;
#if 0
GstCaps *bufcaps, *padcaps;
#endif
GstMultiHandleSink *mhsink;
sink = GST_MULTI_FD_SINK (bsink);
mhsink = GST_MULTI_HANDLE_SINK (sink);
g_return_val_if_fail (GST_OBJECT_FLAG_IS_SET (sink,
GST_MULTI_HANDLE_SINK_OPEN), GST_FLOW_FLUSHING);
#if 0
/* since we check every buffer for streamheader caps, we need to make
* sure every buffer has caps set */
bufcaps = gst_buffer_get_caps (buf);
padcaps = GST_PAD_CAPS (GST_BASE_SINK_PAD (bsink));
/* make sure we have caps on the pad */
if (!padcaps && !bufcaps)
goto no_caps;
#endif
/* get HEADER first, code below might mess with the flags */
in_caps = GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_HEADER);
#if 0
/* stamp the buffer with previous caps if no caps set */
if (!bufcaps) {
if (!gst_buffer_is_writable (buf)) {
/* metadata is not writable, copy will be made and original buffer
* will be unreffed so we need to ref so that we don't lose the
* buffer in the render method. */
gst_buffer_ref (buf);
/* the new buffer is ours only, we keep it out of the scope of this
* function */
buf = gst_buffer_make_writable (buf);
} else {
/* else the metadata is writable, we ref because we keep the buffer
* out of the scope of this method */
gst_buffer_ref (buf);
}
/* buffer metadata is writable now, set the caps */
gst_buffer_set_caps (buf, padcaps);
} else {
gst_caps_unref (bufcaps);
/* since we keep this buffer out of the scope of this method */
gst_buffer_ref (buf);
}
#endif
gst_buffer_ref (buf);
GST_LOG_OBJECT (sink, "received buffer %p, in_caps: %s, offset %"
G_GINT64_FORMAT ", offset_end %" G_GINT64_FORMAT
", timestamp %" GST_TIME_FORMAT ", duration %" GST_TIME_FORMAT,
buf, in_caps ? "yes" : "no", GST_BUFFER_OFFSET (buf),
GST_BUFFER_OFFSET_END (buf),
GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf)),
GST_TIME_ARGS (GST_BUFFER_DURATION (buf)));
/* if we get HEADER buffers, but the previous buffer was not HEADER,
* it means we're getting new streamheader buffers, and we should clear
* the old ones */
if (in_caps && sink->previous_buffer_in_caps == FALSE) {
GST_DEBUG_OBJECT (sink,
"receiving new HEADER buffers, clearing old streamheader");
g_slist_foreach (mhsink->streamheader, (GFunc) gst_mini_object_unref, NULL);
g_slist_free (mhsink->streamheader);
mhsink->streamheader = NULL;
}
/* save the current in_caps */
sink->previous_buffer_in_caps = in_caps;
/* if the incoming buffer is marked as IN CAPS, then we assume for now
* it's a streamheader that needs to be sent to each new client, so we
* put it on our internal list of streamheader buffers.
* FIXME: we could check if the buffer's contents are in fact part of the
* current streamheader.
*
* We don't send the buffer to the client, since streamheaders are sent
* separately when necessary. */
if (in_caps) {
GST_DEBUG_OBJECT (sink, "appending HEADER buffer with length %"
G_GSIZE_FORMAT " to streamheader", gst_buffer_get_size (buf));
mhsink->streamheader = g_slist_append (mhsink->streamheader, buf);
} else {
/* queue the buffer, this is a regular data buffer. */
gst_multi_fd_sink_queue_buffer (sink, buf);
mhsink->bytes_to_serve += gst_buffer_get_size (buf);
}
return GST_FLOW_OK;
/* ERRORS */
#if 0
no_caps:
{
GST_ELEMENT_ERROR (sink, CORE, NEGOTIATION, (NULL),
("Received first buffer without caps set"));
return GST_FLOW_NOT_NEGOTIATED;
}
#endif
}
static void
gst_multi_fd_sink_set_property (GObject * object, guint prop_id,
const GValue * value, GParamSpec * pspec)
@ -2382,31 +2157,15 @@ gst_multi_fd_sink_set_property (GObject * object, guint prop_id,
case PROP_MODE:
multifdsink->mode = g_value_get_enum (value);
break;
case PROP_BUFFERS_MAX:
multifdsink->units_max = g_value_get_int (value);
case PROP_UNIT_FORMAT:
multifdsink->unit_format = g_value_get_enum (value);
break;
case PROP_BUFFERS_SOFT_MAX:
multifdsink->units_soft_max = g_value_get_int (value);
break;
case PROP_UNIT_TYPE:
multifdsink->unit_type = g_value_get_enum (value);
break;
case PROP_UNITS_MAX:
multifdsink->units_max = g_value_get_int64 (value);
break;
case PROP_UNITS_SOFT_MAX:
multifdsink->units_soft_max = g_value_get_int64 (value);
break;
case PROP_BURST_UNIT:
multifdsink->def_burst_unit = g_value_get_enum (value);
case PROP_BURST_FORMAT:
multifdsink->def_burst_format = g_value_get_enum (value);
break;
case PROP_BURST_VALUE:
multifdsink->def_burst_value = g_value_get_uint64 (value);
break;
case PROP_QOS_DSCP:
multifdsink->qos_dscp = g_value_get_int (value);
setup_dscp (multifdsink);
break;
case PROP_HANDLE_READ:
multifdsink->handle_read = g_value_get_boolean (value);
break;
@ -2429,30 +2188,15 @@ gst_multi_fd_sink_get_property (GObject * object, guint prop_id, GValue * value,
case PROP_MODE:
g_value_set_enum (value, multifdsink->mode);
break;
case PROP_BUFFERS_MAX:
g_value_set_int (value, multifdsink->units_max);
case PROP_UNIT_FORMAT:
g_value_set_enum (value, multifdsink->unit_format);
break;
case PROP_BUFFERS_SOFT_MAX:
g_value_set_int (value, multifdsink->units_soft_max);
break;
case PROP_UNIT_TYPE:
g_value_set_enum (value, multifdsink->unit_type);
break;
case PROP_UNITS_MAX:
g_value_set_int64 (value, multifdsink->units_max);
break;
case PROP_UNITS_SOFT_MAX:
g_value_set_int64 (value, multifdsink->units_soft_max);
break;
case PROP_BURST_UNIT:
g_value_set_enum (value, multifdsink->def_burst_unit);
case PROP_BURST_FORMAT:
g_value_set_enum (value, multifdsink->def_burst_format);
break;
case PROP_BURST_VALUE:
g_value_set_uint64 (value, multifdsink->def_burst_value);
break;
case PROP_QOS_DSCP:
g_value_set_int (value, multifdsink->qos_dscp);
break;
case PROP_HANDLE_READ:
g_value_set_boolean (value, multifdsink->handle_read);
break;

View file

@ -49,19 +49,19 @@ typedef struct _GstMultiFdSinkClass GstMultiFdSinkClass;
/**
* GstTCPUnitType:
* @GST_TCP_UNIT_TYPE_UNDEFINED: undefined
* @GST_TCP_UNIT_TYPE_BUFFERS : buffers
* @GST_TCP_UNIT_TYPE_TIME : timeunits (in nanoseconds)
* @GST_TCP_UNIT_TYPE_BYTES : bytes
* @GST_TCP_UNIT_FORMAT_UNDEFINED: undefined
* @GST_TCP_UNIT_FORMAT_BUFFERS : buffers
* @GST_TCP_UNIT_FORMAT_TIME : timeunits (in nanoseconds)
* @GST_TCP_UNIT_FORMAT_BYTES : bytes
*
* The units used to specify limits.
*/
typedef enum
{
GST_TCP_UNIT_TYPE_UNDEFINED,
GST_TCP_UNIT_TYPE_BUFFERS,
GST_TCP_UNIT_TYPE_TIME,
GST_TCP_UNIT_TYPE_BYTES
GST_TCP_UNIT_FORMAT_UNDEFINED,
GST_TCP_UNIT_FORMAT_BUFFERS,
GST_TCP_UNIT_FORMAT_TIME,
GST_TCP_UNIT_FORMAT_BYTES,
} GstTCPUnitType;
/* structure for a client
@ -77,9 +77,9 @@ typedef struct {
/* method to sync client when connecting */
GstSyncMethod sync_method;
GstTCPUnitType burst_min_unit;
GstFormat burst_min_format;
guint64 burst_min_value;
GstTCPUnitType burst_max_unit;
GstFormat burst_max_format;
guint64 burst_max_value;
} GstTCPClient;
@ -97,19 +97,13 @@ struct _GstMultiFdSink {
gint mode;
GstPoll *fdset;
gboolean previous_buffer_in_caps;
guint mtu;
gint qos_dscp;
gboolean handle_read;
/* these values are used to check if a client is reading fast
* enough and to control receovery */
GstTCPUnitType unit_type;/* the type of the units */
gint64 units_max; /* max units to queue for a client */
gint64 units_soft_max; /* max units a client can lag before recovery starts */
GstFormat unit_format;/* the type of the units */
GstTCPUnitType def_burst_unit;
GstFormat def_burst_format;
guint64 def_burst_value;
guint8 header_flags;
@ -121,8 +115,8 @@ struct _GstMultiFdSinkClass {
/* element methods */
void (*add) (GstMultiFdSink *sink, int fd);
void (*add_full) (GstMultiFdSink *sink, int fd, GstSyncMethod sync,
GstTCPUnitType format, guint64 value,
GstTCPUnitType max_unit, guint64 max_value);
GstFormat format, guint64 value,
GstFormat max_format, guint64 max_value);
void (*remove) (GstMultiFdSink *sink, int fd);
void (*remove_flush) (GstMultiFdSink *sink, int fd);
void (*clear) (GstMultiFdSink *sink);
@ -142,8 +136,8 @@ GType gst_multi_fd_sink_get_type (void);
void gst_multi_fd_sink_add (GstMultiFdSink *sink, int fd);
void gst_multi_fd_sink_add_full (GstMultiFdSink *sink, int fd, GstSyncMethod sync,
GstTCPUnitType min_unit, guint64 min_value,
GstTCPUnitType max_unit, guint64 max_value);
GstFormat min_format, guint64 min_value,
GstFormat max_format, guint64 max_value);
void gst_multi_fd_sink_remove (GstMultiFdSink *sink, int fd);
void gst_multi_fd_sink_remove_flush (GstMultiFdSink *sink, int fd);
void gst_multi_fd_sink_clear (GstMultiHandleSink *sink);

View file

@ -144,13 +144,12 @@ enum
/* this is really arbitrarily chosen */
#define DEFAULT_MODE 1
#define DEFAULT_BUFFERS_MAX -1
#define DEFAULT_BUFFERS_SOFT_MAX -1
#define DEFAULT_TIME_MIN -1
#define DEFAULT_BYTES_MIN -1
#define DEFAULT_BUFFERS_MIN -1
#define DEFAULT_UNIT_TYPE GST_FORMAT_BUFFERS
#define DEFAULT_UNIT_FORMAT GST_FORMAT_BUFFERS
#define DEFAULT_UNITS_MAX -1
#define DEFAULT_UNITS_SOFT_MAX -1
#define DEFAULT_RECOVER_POLICY GST_RECOVER_POLICY_NONE
@ -167,21 +166,18 @@ enum
enum
{
PROP_0,
#if 0
PROP_MODE,
#endif
PROP_BUFFERS_QUEUED,
PROP_BYTES_QUEUED,
PROP_TIME_QUEUED,
#if 0
PROP_UNIT_TYPE,
PROP_UNIT_FORMAT,
#endif
PROP_UNITS_MAX,
PROP_UNITS_SOFT_MAX,
PROP_BUFFERS_MAX,
PROP_BUFFERS_SOFT_MAX,
#endif
PROP_TIME_MIN,
PROP_BYTES_MIN,
@ -196,9 +192,9 @@ enum
#if 0
PROP_BURST_FORMAT,
PROP_BURST_VALUE,
#endif
PROP_QOS_DSCP,
#endif
PROP_RESEND_STREAMHEADER,
@ -304,9 +300,9 @@ static gboolean gst_multi_handle_sink_socket_condition (GSocket * socket,
GIOCondition condition, GstMultiHandleSink * sink);
#endif
#if 0
static GstFlowReturn gst_multi_handle_sink_render (GstBaseSink * bsink,
GstBuffer * buf);
#if 0
static gboolean gst_multi_handle_sink_unlock (GstBaseSink * bsink);
static gboolean gst_multi_handle_sink_unlock_stop (GstBaseSink * bsink);
#endif
@ -328,15 +324,11 @@ gst_multi_handle_sink_class_init (GstMultiHandleSinkClass * klass)
{
GObjectClass *gobject_class;
GstElementClass *gstelement_class;
#if 0
GstBaseSinkClass *gstbasesink_class;
#endif
gobject_class = (GObjectClass *) klass;
gstelement_class = (GstElementClass *) klass;
#if 0
gstbasesink_class = (GstBaseSinkClass *) klass;
#endif
gobject_class->set_property = gst_multi_handle_sink_set_property;
gobject_class->get_property = gst_multi_handle_sink_get_property;
@ -374,10 +366,10 @@ gst_multi_handle_sink_class_init (GstMultiHandleSinkClass * klass)
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
#if 0
g_object_class_install_property (gobject_class, PROP_UNIT_TYPE,
g_param_spec_enum ("unit-type", "Units type",
g_object_class_install_property (gobject_class, PROP_UNIT_FORMAT,
g_param_spec_enum ("unit-format", "Units format",
"The unit to measure the max/soft-max/queued properties",
GST_TYPE_FORMAT, DEFAULT_UNIT_TYPE,
GST_TYPE_FORMAT, DEFAULT_UNIT_FORMAT,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_UNITS_MAX,
g_param_spec_int64 ("units-max", "Units max",
@ -438,6 +430,7 @@ gst_multi_handle_sink_class_init (GstMultiHandleSinkClass * klass)
g_param_spec_uint64 ("burst-value", "Burst value",
"The amount of burst expressed in burst-unit", 0, G_MAXUINT64,
DEFAULT_BURST_VALUE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
#endif
g_object_class_install_property (gobject_class, PROP_QOS_DSCP,
g_param_spec_int ("qos-dscp", "QoS diff srv code point",
@ -445,6 +438,7 @@ gst_multi_handle_sink_class_init (GstMultiHandleSinkClass * klass)
-1, 63, DEFAULT_QOS_DSCP,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
#if 0
/**
* GstMultiHandleSink::handle-read
*
@ -638,9 +632,9 @@ gst_multi_handle_sink_class_init (GstMultiHandleSinkClass * klass)
gstelement_class->change_state =
GST_DEBUG_FUNCPTR (gst_multi_handle_sink_change_state);
#if 0
gstbasesink_class->render = GST_DEBUG_FUNCPTR (gst_multi_handle_sink_render);
#if 0
gstbasesink_class->unlock = GST_DEBUG_FUNCPTR (gst_multi_handle_sink_unlock);
gstbasesink_class->unlock_stop =
GST_DEBUG_FUNCPTR (gst_multi_handle_sink_unlock_stop);
@ -673,7 +667,7 @@ gst_multi_handle_sink_init (GstMultiHandleSink * this)
#endif
this->bufqueue = g_array_new (FALSE, TRUE, sizeof (GstBuffer *));
this->unit_type = DEFAULT_UNIT_TYPE;
this->unit_format = DEFAULT_UNIT_FORMAT;
this->units_max = DEFAULT_UNITS_MAX;
this->units_soft_max = DEFAULT_UNITS_SOFT_MAX;
this->time_min = DEFAULT_TIME_MIN;
@ -683,15 +677,15 @@ gst_multi_handle_sink_init (GstMultiHandleSink * this)
this->timeout = DEFAULT_TIMEOUT;
this->def_sync_method = DEFAULT_SYNC_METHOD;
#if 0
this->def_burst_format = DEFAULT_BURST_FORMAT;
this->def_burst_value = DEFAULT_BURST_VALUE;
#endif
this->qos_dscp = DEFAULT_QOS_DSCP;
this->resend_streamheader = DEFAULT_RESEND_STREAMHEADER;
this->header_flags = 0;
this->cancellable = g_cancellable_new ();
}
static void
@ -707,9 +701,9 @@ gst_multi_handle_sink_finalize (GObject * object)
G_OBJECT_CLASS (parent_class)->finalize (object);
}
#if 0
static gint
setup_dscp_client (GstMultiHandleSink * sink, GstSocketClient * client)
gint
gst_multi_handle_sink_setup_dscp_client (GstMultiHandleSink * sink,
GstMultiHandleClient * client)
{
#ifndef IP_TOS
return 0;
@ -725,12 +719,13 @@ setup_dscp_client (GstMultiHandleSink * sink, GstSocketClient * client)
} sa;
socklen_t slen = sizeof (sa);
gint af;
GstMultiHandleSinkClass *mhsinkclass = GST_MULTI_HANDLE_SINK_GET_CLASS (sink);
/* don't touch */
if (sink->qos_dscp < 0)
return 0;
fd = g_socket_get_fd (client->socket);
fd = mhsinkclass->client_get_fd (client);
if ((ret = getsockname (fd, &sa.sa, &slen)) < 0) {
GST_DEBUG_OBJECT (sink, "could not get sockname: %s", g_strerror (errno));
@ -773,23 +768,6 @@ setup_dscp_client (GstMultiHandleSink * sink, GstSocketClient * client)
#endif
}
static void
setup_dscp (GstMultiHandleSink * sink)
{
GList *clients;
CLIENTS_LOCK (sink);
for (clients = sink->clients; clients; clients = clients->next) {
GstSocketClient *client;
client = clients->data;
setup_dscp_client (sink, client);
}
CLIENTS_UNLOCK (sink);
}
#endif
void
gst_multi_handle_sink_client_init (GstMultiHandleClient * client,
GstSyncMethod sync_method)
@ -818,6 +796,23 @@ gst_multi_handle_sink_client_init (GstMultiHandleClient * client,
client->last_activity_time = client->connect_time;
}
void
gst_multi_handle_sink_setup_dscp (GstMultiHandleSink * mhsink)
{
GList *clients;
CLIENTS_LOCK (mhsink);
for (clients = mhsink->clients; clients; clients = clients->next) {
GstMultiHandleClient *client;
client = clients->data;
gst_multi_handle_sink_setup_dscp_client (mhsink, client);
}
CLIENTS_UNLOCK (mhsink);
}
#if 0
/* "add-full" signal implementation */
void
@ -1421,7 +1416,7 @@ find_syncframe (GstMultiHandleSink * sink, gint idx, gint direction)
static gint
get_buffers_max (GstMultiHandleSink * sink, gint64 max)
{
switch (sink->unit_type) {
switch (sink->unit_format) {
case GST_FORMAT_BUFFERS:
return max;
case GST_FORMAT_TIME:
@ -2409,20 +2404,19 @@ gst_multi_handle_sink_thread (GstMultiHandleSink * sink)
}
#endif
#if 0
static GstFlowReturn
gst_multi_handle_sink_render (GstBaseSink * bsink, GstBuffer * buf)
{
GstMultiHandleSink *sink;
gboolean in_caps;
#if 0
GstCaps *bufcaps, *padcaps;
#endif
sink = GST_MULTI_HANDLE_SINK (bsink);
GstMultiHandleSink *sink = GST_MULTI_HANDLE_SINK (bsink);
GstMultiHandleSinkClass *mhsinkclass = GST_MULTI_HANDLE_SINK_GET_CLASS (sink);
g_return_val_if_fail (GST_OBJECT_FLAG_IS_SET (sink,
GST_MULTI_HANDLE_SINK_OPEN), GST_FLOW_WRONG_STATE);
GST_MULTI_HANDLE_SINK_OPEN), GST_FLOW_FLUSHING);
#if 0
/* since we check every buffer for streamheader caps, we need to make
@ -2436,7 +2430,7 @@ gst_multi_handle_sink_render (GstBaseSink * bsink, GstBuffer * buf)
#endif
/* get IN_CAPS first, code below might mess with the flags */
in_caps = GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_IN_CAPS);
in_caps = GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_HEADER);
#if 0
/* stamp the buffer with previous caps if no caps set */
@ -2463,6 +2457,7 @@ gst_multi_handle_sink_render (GstBaseSink * bsink, GstBuffer * buf)
gst_buffer_ref (buf);
}
#endif
gst_buffer_ref (buf);
GST_LOG_OBJECT (sink, "received buffer %p, in_caps: %s, offset %"
G_GINT64_FORMAT ", offset_end %" G_GINT64_FORMAT
@ -2477,7 +2472,7 @@ gst_multi_handle_sink_render (GstBaseSink * bsink, GstBuffer * buf)
* the old ones */
if (in_caps && sink->previous_buffer_in_caps == FALSE) {
GST_DEBUG_OBJECT (sink,
"receiving new IN_CAPS buffers, clearing old streamheader");
"receiving new HEADER buffers, clearing old streamheader");
g_slist_foreach (sink->streamheader, (GFunc) gst_mini_object_unref, NULL);
g_slist_free (sink->streamheader);
sink->streamheader = NULL;
@ -2495,12 +2490,12 @@ gst_multi_handle_sink_render (GstBaseSink * bsink, GstBuffer * buf)
* We don't send the buffer to the client, since streamheaders are sent
* separately when necessary. */
if (in_caps) {
GST_DEBUG_OBJECT (sink, "appending IN_CAPS buffer with length %"
GST_DEBUG_OBJECT (sink, "appending HEADER buffer with length %"
G_GSIZE_FORMAT " to streamheader", gst_buffer_get_size (buf));
sink->streamheader = g_slist_append (sink->streamheader, buf);
} else {
/* queue the buffer, this is a regular data buffer. */
gst_multi_handle_sink_queue_buffer (sink, buf);
mhsinkclass->queue_buffer (sink, buf);
sink->bytes_to_serve += gst_buffer_get_size (buf);
}
@ -2516,7 +2511,6 @@ no_caps:
}
#endif
}
#endif
static void
gst_multi_handle_sink_set_property (GObject * object, guint prop_id,
@ -2527,14 +2521,12 @@ gst_multi_handle_sink_set_property (GObject * object, guint prop_id,
multihandlesink = GST_MULTI_HANDLE_SINK (object);
switch (prop_id) {
#if 0
case PROP_BUFFERS_MAX:
multihandlesink->units_max = g_value_get_int (value);
break;
case PROP_BUFFERS_SOFT_MAX:
multihandlesink->units_soft_max = g_value_get_int (value);
break;
#endif
case PROP_TIME_MIN:
multihandlesink->time_min = g_value_get_int64 (value);
break;
@ -2545,8 +2537,8 @@ gst_multi_handle_sink_set_property (GObject * object, guint prop_id,
multihandlesink->buffers_min = g_value_get_int (value);
break;
#if 0
case PROP_UNIT_TYPE:
multihandlesink->unit_type = g_value_get_enum (value);
case PROP_UNIT_FORMAT:
multihandlesink->unit_format = g_value_get_enum (value);
break;
case PROP_UNITS_MAX:
multihandlesink->units_max = g_value_get_int64 (value);
@ -2571,14 +2563,16 @@ gst_multi_handle_sink_set_property (GObject * object, guint prop_id,
case PROP_BURST_VALUE:
multihandlesink->def_burst_value = g_value_get_uint64 (value);
break;
#endif
case PROP_QOS_DSCP:
multihandlesink->qos_dscp = g_value_get_int (value);
setup_dscp (multihandlesink);
gst_multi_handle_sink_setup_dscp (multihandlesink);
break;
case PROP_RESEND_STREAMHEADER:
multihandlesink->resend_streamheader = g_value_get_boolean (value);
break;
#endif
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
@ -2594,14 +2588,12 @@ gst_multi_handle_sink_get_property (GObject * object, guint prop_id,
multihandlesink = GST_MULTI_HANDLE_SINK (object);
switch (prop_id) {
#if 0
case PROP_BUFFERS_MAX:
g_value_set_int (value, multihandlesink->units_max);
break;
case PROP_BUFFERS_SOFT_MAX:
g_value_set_int (value, multihandlesink->units_soft_max);
break;
#endif
case PROP_TIME_MIN:
g_value_set_int64 (value, multihandlesink->time_min);
break;
@ -2621,8 +2613,8 @@ gst_multi_handle_sink_get_property (GObject * object, guint prop_id,
g_value_set_uint64 (value, multihandlesink->time_queued);
break;
#if 0
case PROP_UNIT_TYPE:
g_value_set_enum (value, multihandlesink->unit_type);
case PROP_UNIT_FORMAT:
g_value_set_enum (value, multihandlesink->unit_format);
break;
case PROP_UNITS_MAX:
g_value_set_int64 (value, multihandlesink->units_max);
@ -2653,12 +2645,14 @@ gst_multi_handle_sink_get_property (GObject * object, guint prop_id,
case PROP_BURST_VALUE:
g_value_set_uint64 (value, multihandlesink->def_burst_value);
break;
#endif
case PROP_QOS_DSCP:
g_value_set_int (value, multihandlesink->qos_dscp);
break;
case PROP_RESEND_STREAMHEADER:
g_value_set_boolean (value, multihandlesink->resend_streamheader);
break;
#if 0
case PROP_NUM_SOCKETS:
g_value_set_uint (value,
g_hash_table_size (multihandlesink->socket_hash));

View file

@ -177,6 +177,8 @@ find_syncframe (GstMultiHandleSink * sink, gint idx, gint direction);
gboolean is_sync_frame (GstMultiHandleSink * sink, GstBuffer * buffer);
gboolean gst_multi_handle_sink_stop (GstBaseSink * bsink);
gboolean gst_multi_handle_sink_start (GstBaseSink * bsink);
void gst_multi_handle_sink_setup_dscp (GstMultiHandleSink * mhsink);
gint gst_multi_handle_sink_setup_dscp_client (GstMultiHandleSink * sink, GstMultiHandleClient * client);
/**
* GstMultiHandleSink:
@ -200,7 +202,6 @@ struct _GstMultiHandleSink {
GSList *streamheader; /* GSList of GstBuffers to use as streamheader */
gboolean previous_buffer_in_caps;
guint mtu;
gint qos_dscp;
GArray *bufqueue; /* global queue of buffers */
@ -210,7 +211,7 @@ struct _GstMultiHandleSink {
/* these values are used to check if a client is reading fast
* enough and to control receovery */
GstFormat unit_type;/* the format of the units */
GstFormat unit_format;/* the format of the units */
gint64 units_max; /* max units to queue for a client */
gint64 units_soft_max; /* max units a client can lag before recovery starts */
GstRecoverPolicy recover_policy;
@ -253,6 +254,15 @@ struct _GstMultiHandleSinkClass {
void (*stop_post) (GstMultiHandleSink *sink);
gboolean (*start_pre) (GstMultiHandleSink *sink);
gpointer (*thread) (GstMultiHandleSink *sink);
void (*queue_buffer) (GstMultiHandleSink *sink,
GstBuffer *buffer);
gboolean (*client_queue_buffer)
(GstMultiHandleSink *sink,
GstMultiHandleClient *client,
GstBuffer *buffer);
int (*client_get_fd)
(GstMultiHandleClient *client);
GstStructure* (*get_stats) (GstMultiHandleSink *sink, GSocket *socket);
void (*remove_client_link) (GstMultiHandleSink * sink, GList * link);

View file

@ -115,11 +115,6 @@
#define NOT_IMPLEMENTED 0
static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
GST_PAD_SINK,
GST_PAD_ALWAYS,
GST_STATIC_CAPS_ANY);
GST_DEBUG_CATEGORY_STATIC (multisocketsink_debug);
#define GST_CAT_DEFAULT (multisocketsink_debug)
@ -143,35 +138,20 @@ enum
/* this is really arbitrarily chosen */
#define DEFAULT_MODE 1
#define DEFAULT_BUFFERS_MAX -1
#define DEFAULT_BUFFERS_SOFT_MAX -1
#define DEFAULT_UNIT_TYPE GST_FORMAT_BUFFERS
#define DEFAULT_UNITS_MAX -1
#define DEFAULT_UNITS_SOFT_MAX -1
#define DEFAULT_UNIT_FORMAT GST_FORMAT_BUFFERS
#define DEFAULT_BURST_FORMAT GST_FORMAT_UNDEFINED
#define DEFAULT_BURST_VALUE 0
#define DEFAULT_QOS_DSCP -1
enum
{
PROP_0,
PROP_MODE,
PROP_UNIT_TYPE,
PROP_UNITS_MAX,
PROP_UNITS_SOFT_MAX,
PROP_BUFFERS_MAX,
PROP_BUFFERS_SOFT_MAX,
PROP_UNIT_FORMAT,
PROP_BURST_FORMAT,
PROP_BURST_VALUE,
PROP_QOS_DSCP,
PROP_NUM_SOCKETS,
PROP_LAST
@ -183,14 +163,18 @@ static void gst_multi_socket_sink_stop_pre (GstMultiHandleSink * mhsink);
static void gst_multi_socket_sink_stop_post (GstMultiHandleSink * mhsink);
static gboolean gst_multi_socket_sink_start_pre (GstMultiHandleSink * mhsink);
static gpointer gst_multi_socket_sink_thread (GstMultiHandleSink * mhsink);
static void gst_multi_socket_sink_queue_buffer (GstMultiHandleSink * mhsink,
GstBuffer * buffer);
static gboolean gst_multi_socket_sink_client_queue_buffer (GstMultiHandleSink *
mhsink, GstMultiHandleClient * mhclient, GstBuffer * buffer);
static int gst_multi_socket_sink_client_get_fd (GstMultiHandleClient * client);
static void gst_multi_socket_sink_remove_client_link (GstMultiHandleSink * sink,
GList * link);
static gboolean gst_multi_socket_sink_socket_condition (GSocket * socket,
GIOCondition condition, GstMultiSocketSink * sink);
static GstFlowReturn gst_multi_socket_sink_render (GstBaseSink * bsink,
GstBuffer * buf);
static gboolean gst_multi_socket_sink_unlock (GstBaseSink * bsink);
static gboolean gst_multi_socket_sink_unlock_stop (GstBaseSink * bsink);
@ -222,30 +206,10 @@ gst_multi_socket_sink_class_init (GstMultiSocketSinkClass * klass)
gobject_class->get_property = gst_multi_socket_sink_get_property;
gobject_class->finalize = gst_multi_socket_sink_finalize;
g_object_class_install_property (gobject_class, PROP_BUFFERS_MAX,
g_param_spec_int ("buffers-max", "Buffers max",
"max number of buffers to queue for a client (-1 = no limit)", -1,
G_MAXINT, DEFAULT_BUFFERS_MAX,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_BUFFERS_SOFT_MAX,
g_param_spec_int ("buffers-soft-max", "Buffers soft max",
"Recover client when going over this limit (-1 = no limit)", -1,
G_MAXINT, DEFAULT_BUFFERS_SOFT_MAX,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_UNIT_TYPE,
g_param_spec_enum ("unit-type", "Units type",
g_object_class_install_property (gobject_class, PROP_UNIT_FORMAT,
g_param_spec_enum ("unit-format", "Units format",
"The unit to measure the max/soft-max/queued properties",
GST_TYPE_FORMAT, DEFAULT_UNIT_TYPE,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_UNITS_MAX,
g_param_spec_int64 ("units-max", "Units max",
"max number of units to queue (-1 = no limit)", -1, G_MAXINT64,
DEFAULT_UNITS_MAX, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_UNITS_SOFT_MAX,
g_param_spec_int64 ("units-soft-max", "Units soft max",
"Recover client when going over this limit (-1 = no limit)", -1,
G_MAXINT64, DEFAULT_UNITS_SOFT_MAX,
GST_TYPE_FORMAT, DEFAULT_UNIT_FORMAT,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_BURST_FORMAT,
@ -258,12 +222,6 @@ gst_multi_socket_sink_class_init (GstMultiSocketSinkClass * klass)
"The amount of burst expressed in burst-unit", 0, G_MAXUINT64,
DEFAULT_BURST_VALUE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_QOS_DSCP,
g_param_spec_int ("qos-dscp", "QoS diff srv code point",
"Quality of Service, differentiated services code point (-1 default)",
-1, 63, DEFAULT_QOS_DSCP,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_NUM_SOCKETS,
g_param_spec_uint ("num-sockets", "Number of sockets",
"The current number of client sockets",
@ -403,9 +361,6 @@ gst_multi_socket_sink_class_init (GstMultiSocketSinkClass * klass)
client_socket_removed), NULL, NULL, g_cclosure_marshal_VOID__OBJECT,
G_TYPE_NONE, 1, G_TYPE_SOCKET);
gst_element_class_add_pad_template (gstelement_class,
gst_static_pad_template_get (&sinktemplate));
gst_element_class_set_details_simple (gstelement_class,
"Multi socket sink", "Sink/Network",
"Send data to multiple sockets",
@ -413,8 +368,6 @@ gst_multi_socket_sink_class_init (GstMultiSocketSinkClass * klass)
"Wim Taymans <wim@fluendo.com>, "
"Sebastian Dröge <sebastian.droege@collabora.co.uk>");
gstbasesink_class->render = GST_DEBUG_FUNCPTR (gst_multi_socket_sink_render);
gstbasesink_class->unlock = GST_DEBUG_FUNCPTR (gst_multi_socket_sink_unlock);
gstbasesink_class->unlock_stop =
GST_DEBUG_FUNCPTR (gst_multi_socket_sink_unlock_stop);
@ -427,6 +380,12 @@ gst_multi_socket_sink_class_init (GstMultiSocketSinkClass * klass)
GST_DEBUG_FUNCPTR (gst_multi_socket_sink_start_pre);
gstmultihandlesink_class->thread =
GST_DEBUG_FUNCPTR (gst_multi_socket_sink_thread);
gstmultihandlesink_class->queue_buffer =
GST_DEBUG_FUNCPTR (gst_multi_socket_sink_queue_buffer);
gstmultihandlesink_class->client_queue_buffer =
GST_DEBUG_FUNCPTR (gst_multi_socket_sink_client_queue_buffer);
gstmultihandlesink_class->client_get_fd =
GST_DEBUG_FUNCPTR (gst_multi_socket_sink_client_get_fd);
gstmultihandlesink_class->remove_client_link =
GST_DEBUG_FUNCPTR (gst_multi_socket_sink_remove_client_link);
@ -446,16 +405,11 @@ gst_multi_socket_sink_init (GstMultiSocketSink * this)
{
this->socket_hash = g_hash_table_new (g_direct_hash, g_int_equal);
this->unit_type = DEFAULT_UNIT_TYPE;
this->units_max = DEFAULT_UNITS_MAX;
this->units_soft_max = DEFAULT_UNITS_SOFT_MAX;
this->unit_format = DEFAULT_UNIT_FORMAT;
this->def_burst_format = DEFAULT_BURST_FORMAT;
this->def_burst_value = DEFAULT_BURST_VALUE;
this->qos_dscp = DEFAULT_QOS_DSCP;
this->header_flags = 0;
this->cancellable = g_cancellable_new ();
}
@ -473,86 +427,12 @@ gst_multi_socket_sink_finalize (GObject * object)
G_OBJECT_CLASS (parent_class)->finalize (object);
}
static gint
setup_dscp_client (GstMultiSocketSink * sink, GstSocketClient * client)
static int
gst_multi_socket_sink_client_get_fd (GstMultiHandleClient * client)
{
#ifndef IP_TOS
return 0;
#else
gint tos;
gint ret;
int fd;
union gst_sockaddr
{
struct sockaddr sa;
struct sockaddr_in6 sa_in6;
struct sockaddr_storage sa_stor;
} sa;
socklen_t slen = sizeof (sa);
gint af;
GstSocketClient *msclient = (GstSocketClient *) client;
/* don't touch */
if (sink->qos_dscp < 0)
return 0;
fd = g_socket_get_fd (client->socket);
if ((ret = getsockname (fd, &sa.sa, &slen)) < 0) {
GST_DEBUG_OBJECT (sink, "could not get sockname: %s", g_strerror (errno));
return ret;
}
af = sa.sa.sa_family;
/* if this is an IPv4-mapped address then do IPv4 QoS */
if (af == AF_INET6) {
GST_DEBUG_OBJECT (sink, "check IP6 socket");
if (IN6_IS_ADDR_V4MAPPED (&(sa.sa_in6.sin6_addr))) {
GST_DEBUG_OBJECT (sink, "mapped to IPV4");
af = AF_INET;
}
}
/* extract and shift 6 bits of the DSCP */
tos = (sink->qos_dscp & 0x3f) << 2;
switch (af) {
case AF_INET:
ret = setsockopt (fd, IPPROTO_IP, IP_TOS, &tos, sizeof (tos));
break;
case AF_INET6:
#ifdef IPV6_TCLASS
ret = setsockopt (fd, IPPROTO_IPV6, IPV6_TCLASS, &tos, sizeof (tos));
break;
#endif
default:
ret = 0;
GST_ERROR_OBJECT (sink, "unsupported AF");
break;
}
if (ret)
GST_DEBUG_OBJECT (sink, "could not set DSCP: %s", g_strerror (errno));
return ret;
#endif
}
static void
setup_dscp (GstMultiSocketSink * sink)
{
GList *clients;
GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
CLIENTS_LOCK (sink);
for (clients = mhsink->clients; clients; clients = clients->next) {
GstSocketClient *client;
client = clients->data;
setup_dscp_client (sink, client);
}
CLIENTS_UNLOCK (sink);
return g_socket_get_fd (msclient->socket);
}
/* "add-full" signal implementation */
@ -615,7 +495,7 @@ gst_multi_socket_sink_add_full (GstMultiSocketSink * sink, GSocket * socket,
g_source_attach (client->source, sink->main_context);
}
setup_dscp_client (sink, client);
gst_multi_handle_sink_setup_dscp_client (mhsink, mhclient);
CLIENTS_UNLOCK (sink);
@ -958,11 +838,11 @@ gst_multi_socket_sink_handle_client_read (GstMultiSocketSink * sink,
/* queue the given buffer for the given client */
static gboolean
gst_multi_socket_sink_client_queue_buffer (GstMultiSocketSink * sink,
GstSocketClient * client, GstBuffer * buffer)
gst_multi_socket_sink_client_queue_buffer (GstMultiHandleSink * mhsink,
GstMultiHandleClient * mhclient, GstBuffer * buffer)
{
GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
GstSocketClient *client = (GstSocketClient *) mhclient;
GstMultiSocketSink *sink = GST_MULTI_SOCKET_SINK (mhsink);
GstCaps *caps;
/* TRUE: send them if the new caps have them */
@ -1086,7 +966,7 @@ get_buffers_max (GstMultiSocketSink * sink, gint64 max)
{
GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
switch (sink->unit_type) {
switch (sink->unit_format) {
case GST_FORMAT_BUFFERS:
return max;
case GST_FORMAT_TIME:
@ -1540,10 +1420,11 @@ gst_multi_socket_sink_handle_client_write (GstMultiSocketSink * sink,
GstClockTime now;
GTimeVal nowtv;
GError *err = NULL;
GstMultiHandleSink *mhsink;
GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
GstMultiHandleSinkClass *mhsinkclass =
GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
mhsink = GST_MULTI_HANDLE_SINK (sink);
g_get_current_time (&nowtv);
now = GST_TIMEVAL_TO_TIME (nowtv);
@ -1617,7 +1498,7 @@ gst_multi_socket_sink_handle_client_write (GstMultiSocketSink * sink,
socket, client, mhclient->bufpos);
/* queueing a buffer will ref it */
gst_multi_socket_sink_client_queue_buffer (sink, client, buf);
mhsinkclass->client_queue_buffer (mhsink, mhclient, buf);
/* need to start from the first byte for this new buffer */
mhclient->bufoffset = 0;
@ -1730,13 +1611,13 @@ gst_multi_socket_sink_recover_client (GstMultiSocketSink * sink,
break;
case GST_RECOVER_POLICY_RESYNC_SOFT_LIMIT:
/* move to beginning of soft max */
newbufpos = get_buffers_max (sink, sink->units_soft_max);
newbufpos = get_buffers_max (sink, mhsink->units_soft_max);
break;
case GST_RECOVER_POLICY_RESYNC_KEYFRAME:
/* find keyframe in buffers, we search backwards to find the
* closest keyframe relative to what this client already received. */
newbufpos = MIN (mhsink->bufqueue->len - 1,
get_buffers_max (sink, sink->units_soft_max) - 1);
get_buffers_max (sink, mhsink->units_soft_max) - 1);
while (newbufpos >= 0) {
GstBuffer *buf;
@ -1751,7 +1632,7 @@ gst_multi_socket_sink_recover_client (GstMultiSocketSink * sink,
break;
default:
/* unknown recovery procedure */
newbufpos = get_buffers_max (sink, sink->units_soft_max);
newbufpos = get_buffers_max (sink, mhsink->units_soft_max);
break;
}
return newbufpos;
@ -1776,7 +1657,8 @@ gst_multi_socket_sink_recover_client (GstMultiSocketSink * sink,
* the select thread that the fd_set changed.
*/
static void
gst_multi_socket_sink_queue_buffer (GstMultiSocketSink * sink, GstBuffer * buf)
gst_multi_socket_sink_queue_buffer (GstMultiHandleSink * mhsink,
GstBuffer * buf)
{
GList *clients, *next;
gint queuelen;
@ -1786,7 +1668,7 @@ gst_multi_socket_sink_queue_buffer (GstMultiSocketSink * sink, GstBuffer * buf)
GstClockTime now;
gint max_buffers, soft_max_buffers;
guint cookie;
GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
GstMultiSocketSink *sink = GST_MULTI_SOCKET_SINK (mhsink);
GstMultiHandleSinkClass *mhsinkclass =
GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
@ -1799,13 +1681,13 @@ gst_multi_socket_sink_queue_buffer (GstMultiSocketSink * sink, GstBuffer * buf)
g_array_prepend_val (mhsink->bufqueue, buf);
queuelen = mhsink->bufqueue->len;
if (sink->units_max > 0)
max_buffers = get_buffers_max (sink, sink->units_max);
if (mhsink->units_max > 0)
max_buffers = get_buffers_max (sink, mhsink->units_max);
else
max_buffers = -1;
if (sink->units_soft_max > 0)
soft_max_buffers = get_buffers_max (sink, sink->units_soft_max);
if (mhsink->units_soft_max > 0)
soft_max_buffers = get_buffers_max (sink, mhsink->units_soft_max);
else
soft_max_buffers = -1;
GST_LOG_OBJECT (sink, "Using max %d, softmax %d", max_buffers,
@ -2082,116 +1964,6 @@ gst_multi_socket_sink_thread (GstMultiHandleSink * mhsink)
return NULL;
}
static GstFlowReturn
gst_multi_socket_sink_render (GstBaseSink * bsink, GstBuffer * buf)
{
GstMultiSocketSink *sink;
gboolean in_caps;
#if 0
GstCaps *bufcaps, *padcaps;
#endif
GstMultiHandleSink *mhsink;
sink = GST_MULTI_SOCKET_SINK (bsink);
mhsink = GST_MULTI_HANDLE_SINK (sink);
g_return_val_if_fail (GST_OBJECT_FLAG_IS_SET (sink,
GST_MULTI_HANDLE_SINK_OPEN), GST_FLOW_FLUSHING);
#if 0
/* since we check every buffer for streamheader caps, we need to make
* sure every buffer has caps set */
bufcaps = gst_buffer_get_caps (buf);
padcaps = GST_PAD_CAPS (GST_BASE_SINK_PAD (bsink));
/* make sure we have caps on the pad */
if (!padcaps && !bufcaps)
goto no_caps;
#endif
/* get HEADER first, code below might mess with the flags */
in_caps = GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_HEADER);
#if 0
/* stamp the buffer with previous caps if no caps set */
if (!bufcaps) {
if (!gst_buffer_is_writable (buf)) {
/* metadata is not writable, copy will be made and original buffer
* will be unreffed so we need to ref so that we don't lose the
* buffer in the render method. */
gst_buffer_ref (buf);
/* the new buffer is ours only, we keep it out of the scope of this
* function */
buf = gst_buffer_make_writable (buf);
} else {
/* else the metadata is writable, we ref because we keep the buffer
* out of the scope of this method */
gst_buffer_ref (buf);
}
/* buffer metadata is writable now, set the caps */
gst_buffer_set_caps (buf, padcaps);
} else {
gst_caps_unref (bufcaps);
/* since we keep this buffer out of the scope of this method */
gst_buffer_ref (buf);
}
#endif
gst_buffer_ref (buf);
GST_LOG_OBJECT (sink, "received buffer %p, in_caps: %s, offset %"
G_GINT64_FORMAT ", offset_end %" G_GINT64_FORMAT
", timestamp %" GST_TIME_FORMAT ", duration %" GST_TIME_FORMAT,
buf, in_caps ? "yes" : "no", GST_BUFFER_OFFSET (buf),
GST_BUFFER_OFFSET_END (buf),
GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf)),
GST_TIME_ARGS (GST_BUFFER_DURATION (buf)));
/* if we get HEADER buffers, but the previous buffer was not HEADER,
* it means we're getting new streamheader buffers, and we should clear
* the old ones */
if (in_caps && sink->previous_buffer_in_caps == FALSE) {
GST_DEBUG_OBJECT (sink,
"receiving new HEADER buffers, clearing old streamheader");
g_slist_foreach (mhsink->streamheader, (GFunc) gst_mini_object_unref, NULL);
g_slist_free (mhsink->streamheader);
mhsink->streamheader = NULL;
}
/* save the current in_caps */
sink->previous_buffer_in_caps = in_caps;
/* if the incoming buffer is marked as IN CAPS, then we assume for now
* it's a streamheader that needs to be sent to each new client, so we
* put it on our internal list of streamheader buffers.
* FIXME: we could check if the buffer's contents are in fact part of the
* current streamheader.
*
* We don't send the buffer to the client, since streamheaders are sent
* separately when necessary. */
if (in_caps) {
GST_DEBUG_OBJECT (sink, "appending HEADER buffer with length %"
G_GSIZE_FORMAT " to streamheader", gst_buffer_get_size (buf));
mhsink->streamheader = g_slist_append (mhsink->streamheader, buf);
} else {
/* queue the buffer, this is a regular data buffer. */
gst_multi_socket_sink_queue_buffer (sink, buf);
mhsink->bytes_to_serve += gst_buffer_get_size (buf);
}
return GST_FLOW_OK;
/* ERRORS */
#if 0
no_caps:
{
GST_ELEMENT_ERROR (sink, CORE, NEGOTIATION, (NULL),
("Received first buffer without caps set"));
return GST_FLOW_NOT_NEGOTIATED;
}
#endif
}
static void
gst_multi_socket_sink_set_property (GObject * object, guint prop_id,
const GValue * value, GParamSpec * pspec)
@ -2201,20 +1973,8 @@ gst_multi_socket_sink_set_property (GObject * object, guint prop_id,
multisocketsink = GST_MULTI_SOCKET_SINK (object);
switch (prop_id) {
case PROP_BUFFERS_MAX:
multisocketsink->units_max = g_value_get_int (value);
break;
case PROP_BUFFERS_SOFT_MAX:
multisocketsink->units_soft_max = g_value_get_int (value);
break;
case PROP_UNIT_TYPE:
multisocketsink->unit_type = g_value_get_enum (value);
break;
case PROP_UNITS_MAX:
multisocketsink->units_max = g_value_get_int64 (value);
break;
case PROP_UNITS_SOFT_MAX:
multisocketsink->units_soft_max = g_value_get_int64 (value);
case PROP_UNIT_FORMAT:
multisocketsink->unit_format = g_value_get_enum (value);
break;
case PROP_BURST_FORMAT:
multisocketsink->def_burst_format = g_value_get_enum (value);
@ -2222,10 +1982,6 @@ gst_multi_socket_sink_set_property (GObject * object, guint prop_id,
case PROP_BURST_VALUE:
multisocketsink->def_burst_value = g_value_get_uint64 (value);
break;
case PROP_QOS_DSCP:
multisocketsink->qos_dscp = g_value_get_int (value);
setup_dscp (multisocketsink);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
@ -2242,20 +1998,8 @@ gst_multi_socket_sink_get_property (GObject * object, guint prop_id,
multisocketsink = GST_MULTI_SOCKET_SINK (object);
switch (prop_id) {
case PROP_BUFFERS_MAX:
g_value_set_int (value, multisocketsink->units_max);
break;
case PROP_BUFFERS_SOFT_MAX:
g_value_set_int (value, multisocketsink->units_soft_max);
break;
case PROP_UNIT_TYPE:
g_value_set_enum (value, multisocketsink->unit_type);
break;
case PROP_UNITS_MAX:
g_value_set_int64 (value, multisocketsink->units_max);
break;
case PROP_UNITS_SOFT_MAX:
g_value_set_int64 (value, multisocketsink->units_soft_max);
case PROP_UNIT_FORMAT:
g_value_set_enum (value, multisocketsink->unit_format);
break;
case PROP_BURST_FORMAT:
g_value_set_enum (value, multisocketsink->def_burst_format);
@ -2263,9 +2007,6 @@ gst_multi_socket_sink_get_property (GObject * object, guint prop_id,
case PROP_BURST_VALUE:
g_value_set_uint64 (value, multisocketsink->def_burst_value);
break;
case PROP_QOS_DSCP:
g_value_set_int (value, multisocketsink->qos_dscp);
break;
case PROP_NUM_SOCKETS:
g_value_set_uint (value,
g_hash_table_size (multisocketsink->socket_hash));

View file

@ -83,13 +83,10 @@ struct _GstMultiSocketSink {
gboolean previous_buffer_in_caps;
guint mtu;
gint qos_dscp;
/* these values are used to check if a client is reading fast
* enough and to control receovery */
GstFormat unit_type;/* the format of the units */
gint64 units_max; /* max units to queue for a client */
gint64 units_soft_max; /* max units a client can lag before recovery starts */
GstFormat unit_format;/* the format of the units */
GstFormat def_burst_format;
guint64 def_burst_value;

View file

@ -26,6 +26,9 @@
#include <gst/check/gstcheck.h>
/* FIXME: remove this header once formats are refactored */
#include "gst/tcp/gstmultifdsink.h"
static GstPad *mysrcpad;
static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
@ -457,7 +460,6 @@ GST_START_TEST (test_burst_client_bytes)
int pfd1[2];
int pfd2[2];
int pfd3[2];
gchar data[16];
gint i;
guint buffers_queued;
@ -465,7 +467,7 @@ GST_START_TEST (test_burst_client_bytes)
/* make sure we keep at least 100 bytes at all times */
g_object_set (sink, "bytes-min", 100, NULL);
g_object_set (sink, "sync-method", 3, NULL); /* 3 = burst */
g_object_set (sink, "burst-unit", 3, NULL); /* 3 = bytes */
g_object_set (sink, "burst-format", GST_TCP_UNIT_FORMAT_BYTES, NULL);
g_object_set (sink, "burst-value", (guint64) 80, NULL);
fail_if (pipe (pfd1) == -1);
@ -505,38 +507,26 @@ GST_START_TEST (test_burst_client_bytes)
/* now we should only read the last 5 buffers (5 * 16 = 80 bytes) */
GST_DEBUG ("Reading from client 1");
fail_if (read (pfd1[0], data, 16) < 16);
fail_unless (strncmp (data, "deadbee00000005", 16) == 0);
fail_if (read (pfd1[0], data, 16) < 16);
fail_unless (strncmp (data, "deadbee00000006", 16) == 0);
fail_if (read (pfd1[0], data, 16) < 16);
fail_unless (strncmp (data, "deadbee00000007", 16) == 0);
fail_if (read (pfd1[0], data, 16) < 16);
fail_unless (strncmp (data, "deadbee00000008", 16) == 0);
fail_if (read (pfd1[0], data, 16) < 16);
fail_unless (strncmp (data, "deadbee00000009", 16) == 0);
fail_unless_read ("client 1", pfd1[0], 16, "deadbee00000005");
fail_unless_read ("client 1", pfd1[0], 16, "deadbee00000006");
fail_unless_read ("client 1", pfd1[0], 16, "deadbee00000007");
fail_unless_read ("client 1", pfd1[0], 16, "deadbee00000008");
fail_unless_read ("client 1", pfd1[0], 16, "deadbee00000009");
/* second client only bursts 50 bytes = 4 buffers (we get 4 buffers since
* the max alows it) */
GST_DEBUG ("Reading from client 2");
fail_if (read (pfd2[0], data, 16) < 16);
fail_unless (strncmp (data, "deadbee00000006", 16) == 0);
fail_if (read (pfd2[0], data, 16) < 16);
fail_unless (strncmp (data, "deadbee00000007", 16) == 0);
fail_if (read (pfd2[0], data, 16) < 16);
fail_unless (strncmp (data, "deadbee00000008", 16) == 0);
fail_if (read (pfd2[0], data, 16) < 16);
fail_unless (strncmp (data, "deadbee00000009", 16) == 0);
fail_unless_read ("client 2", pfd2[0], 16, "deadbee00000006");
fail_unless_read ("client 2", pfd2[0], 16, "deadbee00000007");
fail_unless_read ("client 2", pfd2[0], 16, "deadbee00000008");
fail_unless_read ("client 2", pfd2[0], 16, "deadbee00000009");
/* third client only bursts 50 bytes = 4 buffers, we can't send
* more than 50 bytes so we only get 3 buffers (48 bytes). */
GST_DEBUG ("Reading from client 3");
fail_if (read (pfd3[0], data, 16) < 16);
fail_unless (strncmp (data, "deadbee00000007", 16) == 0);
fail_if (read (pfd3[0], data, 16) < 16);
fail_unless (strncmp (data, "deadbee00000008", 16) == 0);
fail_if (read (pfd3[0], data, 16) < 16);
fail_unless (strncmp (data, "deadbee00000009", 16) == 0);
fail_unless_read ("client 3", pfd3[0], 16, "deadbee00000007");
fail_unless_read ("client 3", pfd3[0], 16, "deadbee00000008");
fail_unless_read ("client 3", pfd3[0], 16, "deadbee00000009");
GST_DEBUG ("cleaning up multifdsink");
ASSERT_SET_STATE (sink, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
@ -556,15 +546,14 @@ GST_START_TEST (test_burst_client_bytes_keyframe)
int pfd1[2];
int pfd2[2];
int pfd3[2];
gchar data[16];
gint i;
guint buffers_queued;
sink = setup_multifdsink ();
/* make sure we keep at least 100 bytes at all times */
g_object_set (sink, "bytes-min", 100, NULL);
g_object_set (sink, "sync-method", 4, NULL); /* 3 = burst_keyframe */
g_object_set (sink, "burst-unit", 3, NULL); /* 3 = bytes */
g_object_set (sink, "sync-method", 4, NULL); /* 4 = burst_keyframe */
g_object_set (sink, "burst-format", GST_TCP_UNIT_FORMAT_BYTES, NULL);
g_object_set (sink, "burst-value", (guint64) 80, NULL);
fail_if (pipe (pfd1) == -1);
@ -574,8 +563,8 @@ GST_START_TEST (test_burst_client_bytes_keyframe)
ASSERT_SET_STATE (sink, GST_STATE_PLAYING, GST_STATE_CHANGE_ASYNC);
caps = gst_caps_from_string ("application/x-gst-check");
GST_DEBUG ("Created test caps %p %" GST_PTR_FORMAT, caps, caps);
gst_pad_set_caps (mysrcpad, caps);
GST_DEBUG ("Created test caps %p %" GST_PTR_FORMAT, caps, caps);
/* push buffers in, 9 * 16 bytes = 144 bytes */
for (i = 0; i < 9; i++) {
@ -611,34 +600,24 @@ GST_START_TEST (test_burst_client_bytes_keyframe)
/* now we should only read the last 6 buffers (min 5 * 16 = 80 bytes),
* keyframe at buffer 4 */
GST_DEBUG ("Reading from client 1");
fail_if (read (pfd1[0], data, 16) < 16);
fail_unless (strncmp (data, "deadbee00000004", 16) == 0);
fail_if (read (pfd1[0], data, 16) < 16);
fail_unless (strncmp (data, "deadbee00000005", 16) == 0);
fail_if (read (pfd1[0], data, 16) < 16);
fail_unless (strncmp (data, "deadbee00000006", 16) == 0);
fail_if (read (pfd1[0], data, 16) < 16);
fail_unless (strncmp (data, "deadbee00000007", 16) == 0);
fail_if (read (pfd1[0], data, 16) < 16);
fail_unless (strncmp (data, "deadbee00000008", 16) == 0);
fail_if (read (pfd1[0], data, 16) < 16);
fail_unless (strncmp (data, "deadbee00000009", 16) == 0);
fail_unless_read ("client 1", pfd1[0], 16, "deadbee00000004");
fail_unless_read ("client 1", pfd1[0], 16, "deadbee00000005");
fail_unless_read ("client 1", pfd1[0], 16, "deadbee00000006");
fail_unless_read ("client 1", pfd1[0], 16, "deadbee00000007");
fail_unless_read ("client 1", pfd1[0], 16, "deadbee00000008");
fail_unless_read ("client 1", pfd1[0], 16, "deadbee00000009");
/* second client only bursts 50 bytes = 4 buffers, there is
* no keyframe above min and below max, so get one below min */
GST_DEBUG ("Reading from client 2");
fail_if (read (pfd2[0], data, 16) < 16);
fail_unless (strncmp (data, "deadbee00000008", 16) == 0);
fail_if (read (pfd2[0], data, 16) < 16);
fail_unless (strncmp (data, "deadbee00000009", 16) == 0);
fail_unless_read ("client 2", pfd2[0], 16, "deadbee00000008");
fail_unless_read ("client 2", pfd2[0], 16, "deadbee00000009");
/* third client only bursts 50 bytes = 4 buffers, we can't send
* more than 50 bytes so we only get 2 buffers (32 bytes). */
GST_DEBUG ("Reading from client 3");
fail_if (read (pfd3[0], data, 16) < 16);
fail_unless (strncmp (data, "deadbee00000008", 16) == 0);
fail_if (read (pfd3[0], data, 16) < 16);
fail_unless (strncmp (data, "deadbee00000009", 16) == 0);
fail_unless_read ("client 3", pfd3[0], 16, "deadbee00000008");
fail_unless_read ("client 3", pfd3[0], 16, "deadbee00000009");
GST_DEBUG ("cleaning up multifdsink");
ASSERT_SET_STATE (sink, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
@ -658,15 +637,14 @@ GST_START_TEST (test_burst_client_bytes_with_keyframe)
int pfd1[2];
int pfd2[2];
int pfd3[2];
gchar data[16];
gint i;
guint buffers_queued;
sink = setup_multifdsink ();
/* make sure we keep at least 100 bytes at all times */
g_object_set (sink, "bytes-min", 100, NULL);
g_object_set (sink, "sync-method", 5, NULL); /* 3 = burst_with_keyframe */
g_object_set (sink, "burst-unit", 3, NULL); /* 3 = bytes */
g_object_set (sink, "sync-method", 5, NULL); /* 5 = burst_with_keyframe */
g_object_set (sink, "burst-format", GST_TCP_UNIT_FORMAT_BYTES, NULL);
g_object_set (sink, "burst-value", (guint64) 80, NULL);
fail_if (pipe (pfd1) == -1);
@ -713,40 +691,27 @@ GST_START_TEST (test_burst_client_bytes_with_keyframe)
/* now we should only read the last 6 buffers (min 5 * 16 = 80 bytes),
* keyframe at buffer 4 */
GST_DEBUG ("Reading from client 1");
fail_if (read (pfd1[0], data, 16) < 16);
fail_unless (strncmp (data, "deadbee00000004", 16) == 0);
fail_if (read (pfd1[0], data, 16) < 16);
fail_unless (strncmp (data, "deadbee00000005", 16) == 0);
fail_if (read (pfd1[0], data, 16) < 16);
fail_unless (strncmp (data, "deadbee00000006", 16) == 0);
fail_if (read (pfd1[0], data, 16) < 16);
fail_unless (strncmp (data, "deadbee00000007", 16) == 0);
fail_if (read (pfd1[0], data, 16) < 16);
fail_unless (strncmp (data, "deadbee00000008", 16) == 0);
fail_if (read (pfd1[0], data, 16) < 16);
fail_unless (strncmp (data, "deadbee00000009", 16) == 0);
fail_unless_read ("client 1", pfd1[0], 16, "deadbee00000004");
fail_unless_read ("client 1", pfd1[0], 16, "deadbee00000005");
fail_unless_read ("client 1", pfd1[0], 16, "deadbee00000006");
fail_unless_read ("client 1", pfd1[0], 16, "deadbee00000007");
fail_unless_read ("client 1", pfd1[0], 16, "deadbee00000008");
fail_unless_read ("client 1", pfd1[0], 16, "deadbee00000009");
/* second client only bursts 50 bytes = 4 buffers, there is
* no keyframe above min and below max, so send min */
GST_DEBUG ("Reading from client 2");
fail_if (read (pfd2[0], data, 16) < 16);
fail_unless (strncmp (data, "deadbee00000006", 16) == 0);
fail_if (read (pfd2[0], data, 16) < 16);
fail_unless (strncmp (data, "deadbee00000007", 16) == 0);
fail_if (read (pfd2[0], data, 16) < 16);
fail_unless (strncmp (data, "deadbee00000008", 16) == 0);
fail_if (read (pfd2[0], data, 16) < 16);
fail_unless (strncmp (data, "deadbee00000009", 16) == 0);
fail_unless_read ("client 2", pfd2[0], 16, "deadbee00000006");
fail_unless_read ("client 2", pfd2[0], 16, "deadbee00000007");
fail_unless_read ("client 2", pfd2[0], 16, "deadbee00000008");
fail_unless_read ("client 2", pfd2[0], 16, "deadbee00000009");
/* third client only bursts 50 bytes = 4 buffers, we can't send
* more than 50 bytes so we only get 3 buffers (48 bytes). */
GST_DEBUG ("Reading from client 3");
fail_if (read (pfd3[0], data, 16) < 16);
fail_unless (strncmp (data, "deadbee00000007", 16) == 0);
fail_if (read (pfd3[0], data, 16) < 16);
fail_unless (strncmp (data, "deadbee00000008", 16) == 0);
fail_if (read (pfd3[0], data, 16) < 16);
fail_unless (strncmp (data, "deadbee00000009", 16) == 0);
fail_unless_read ("client 3", pfd3[0], 16, "deadbee00000007");
fail_unless_read ("client 3", pfd3[0], 16, "deadbee00000008");
fail_unless_read ("client 3", pfd3[0], 16, "deadbee00000009");
GST_DEBUG ("cleaning up multifdsink");
ASSERT_SET_STATE (sink, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
@ -765,7 +730,6 @@ GST_START_TEST (test_client_next_keyframe)
GstElement *sink;
GstCaps *caps;
int pfd1[2];
gchar data[16];
gint i;
sink = setup_multifdsink ();
@ -793,10 +757,8 @@ GST_START_TEST (test_client_next_keyframe)
/* now we should be able to read some data */
GST_DEBUG ("Reading from client 1");
fail_if (read (pfd1[0], data, 16) < 16);
fail_unless (strncmp (data, "deadbee00000000", 16) == 0);
fail_if (read (pfd1[0], data, 16) < 16);
fail_unless (strncmp (data, "deadbee00000001", 16) == 0);
fail_unless_read ("client 1", pfd1[0], 16, "deadbee00000000");
fail_unless_read ("client 1", pfd1[0], 16, "deadbee00000001");
GST_DEBUG ("cleaning up multifdsink");
ASSERT_SET_STATE (sink, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);

View file

@ -596,7 +596,7 @@ GST_START_TEST (test_burst_client_bytes_keyframe)
sink = setup_multisocketsink ();
/* make sure we keep at least 100 bytes at all times */
g_object_set (sink, "bytes-min", 100, NULL);
g_object_set (sink, "sync-method", 4, NULL); /* 3 = burst_keyframe */
g_object_set (sink, "sync-method", 4, NULL); /* 4 = burst_keyframe */
g_object_set (sink, "burst-format", GST_FORMAT_BYTES, NULL);
g_object_set (sink, "burst-value", (guint64) 80, NULL);
@ -700,7 +700,7 @@ GST_START_TEST (test_burst_client_bytes_with_keyframe)
/* make sure we keep at least 100 bytes at all times */
g_object_set (sink, "bytes-min", 100, NULL);
g_object_set (sink, "sync-method", 5, NULL); /* 3 = burst_with_keyframe */
g_object_set (sink, "sync-method", 5, NULL); /* 5 = burst_with_keyframe */
g_object_set (sink, "burst-format", GST_FORMAT_BYTES, NULL);
g_object_set (sink, "burst-value", (guint64) 80, NULL);