multihandlesink: further refactoring

This commit is contained in:
Thomas Vander Stichele 2012-01-26 23:19:33 +01:00
parent d21c42a709
commit 604ddc8740
8 changed files with 3237 additions and 842 deletions

View file

@ -166,15 +166,10 @@ enum
#define DEFAULT_QOS_DSCP -1
#define DEFAULT_HANDLE_READ TRUE
#define DEFAULT_RESEND_STREAMHEADER TRUE
enum
{
PROP_0,
PROP_MODE,
PROP_BUFFERS_QUEUED,
PROP_BYTES_QUEUED,
PROP_TIME_QUEUED,
PROP_UNIT_TYPE,
PROP_UNITS_MAX,
@ -190,8 +185,6 @@ enum
PROP_HANDLE_READ,
PROP_RESEND_STREAMHEADER,
PROP_NUM_FDS,
PROP_LAST
@ -238,13 +231,17 @@ gst_unit_type_get_type (void)
static void gst_multi_fd_sink_finalize (GObject * object);
static void gst_multi_fd_sink_remove_client_link (GstMultiFdSink * sink,
static void gst_multi_fd_sink_clear_post (GstMultiHandleSink * mhsink);
static void gst_multi_fd_sink_stop_pre (GstMultiHandleSink * mhsink);
static void gst_multi_fd_sink_stop_post (GstMultiHandleSink * mhsink);
static gboolean gst_multi_fd_sink_start_pre (GstMultiHandleSink * mhsink);
static gpointer gst_multi_fd_sink_thread (GstMultiHandleSink * mhsink);
static void gst_multi_fd_sink_remove_client_link (GstMultiHandleSink * sink,
GList * link);
static GstFlowReturn gst_multi_fd_sink_render (GstBaseSink * bsink,
GstBuffer * buf);
static GstStateChangeReturn gst_multi_fd_sink_change_state (GstElement *
element, GstStateChange transition);
static void gst_multi_fd_sink_set_property (GObject * object, guint prop_id,
const GValue * value, GParamSpec * pspec);
@ -313,21 +310,6 @@ gst_multi_fd_sink_class_init (GstMultiFdSinkClass * klass)
G_MAXINT64, DEFAULT_UNITS_SOFT_MAX,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_BUFFERS_QUEUED,
g_param_spec_uint ("buffers-queued", "Buffers queued",
"Number of buffers currently queued", 0, G_MAXUINT, 0,
G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
#if NOT_IMPLEMENTED
g_object_class_install_property (gobject_class, PROP_BYTES_QUEUED,
g_param_spec_uint ("bytes-queued", "Bytes queued",
"Number of bytes currently queued", 0, G_MAXUINT, 0,
G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_TIME_QUEUED,
g_param_spec_uint64 ("time-queued", "Time queued",
"Number of time currently queued", 0, G_MAXUINT64, 0,
G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
#endif
g_object_class_install_property (gobject_class, PROP_BURST_UNIT,
g_param_spec_enum ("burst-unit", "Burst unit",
"The format of the burst units (when sync-method is burst[[-with]-keyframe])",
@ -354,18 +336,6 @@ gst_multi_fd_sink_class_init (GstMultiFdSinkClass * klass)
g_param_spec_boolean ("handle-read", "Handle Read",
"Handle client reads and discard the data",
DEFAULT_HANDLE_READ, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/**
* GstMultiFdSink::resend-streamheader
*
* Resend the streamheaders to existing clients when they change.
*
* Since: 0.10.23
*/
g_object_class_install_property (gobject_class, PROP_RESEND_STREAMHEADER,
g_param_spec_boolean ("resend-streamheader", "Resend streamheader",
"Resend the streamheader if it changes in the caps",
DEFAULT_RESEND_STREAMHEADER,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_NUM_FDS,
g_param_spec_uint ("num-fds", "Number of fds",
@ -516,12 +486,23 @@ gst_multi_fd_sink_class_init (GstMultiFdSinkClass * klass)
"Thomas Vander Stichele <thomas at apestaart dot org>, "
"Wim Taymans <wim@fluendo.com>");
gstelement_class->change_state =
GST_DEBUG_FUNCPTR (gst_multi_fd_sink_change_state);
gstbasesink_class->render = GST_DEBUG_FUNCPTR (gst_multi_fd_sink_render);
gstmultihandlesink_class->clear = GST_DEBUG_FUNCPTR (gst_multi_fd_sink_clear);
gstmultihandlesink_class->clear_post =
GST_DEBUG_FUNCPTR (gst_multi_fd_sink_clear_post);
gstmultihandlesink_class->stop_pre =
GST_DEBUG_FUNCPTR (gst_multi_fd_sink_stop_pre);
gstmultihandlesink_class->stop_post =
GST_DEBUG_FUNCPTR (gst_multi_fd_sink_stop_post);
gstmultihandlesink_class->start_pre =
GST_DEBUG_FUNCPTR (gst_multi_fd_sink_start_pre);
gstmultihandlesink_class->thread =
GST_DEBUG_FUNCPTR (gst_multi_fd_sink_thread);
gstmultihandlesink_class->remove_client_link =
GST_DEBUG_FUNCPTR (gst_multi_fd_sink_remove_client_link);
klass->add = GST_DEBUG_FUNCPTR (gst_multi_fd_sink_add);
klass->add_full = GST_DEBUG_FUNCPTR (gst_multi_fd_sink_add_full);
@ -535,15 +516,10 @@ gst_multi_fd_sink_class_init (GstMultiFdSinkClass * klass)
static void
gst_multi_fd_sink_init (GstMultiFdSink * this)
{
GST_OBJECT_FLAG_UNSET (this, GST_MULTI_HANDLE_SINK_OPEN);
this->mode = DEFAULT_MODE;
CLIENTS_LOCK_INIT (this);
this->clients = NULL;
this->fd_hash = g_hash_table_new (g_int_hash, g_int_equal);
this->bufqueue = g_array_new (FALSE, TRUE, sizeof (GstBuffer *));
this->unit_type = DEFAULT_UNIT_TYPE;
this->units_max = DEFAULT_UNITS_MAX;
this->units_soft_max = DEFAULT_UNITS_SOFT_MAX;
@ -554,21 +530,15 @@ gst_multi_fd_sink_init (GstMultiFdSink * this)
this->qos_dscp = DEFAULT_QOS_DSCP;
this->handle_read = DEFAULT_HANDLE_READ;
this->resend_streamheader = DEFAULT_RESEND_STREAMHEADER;
this->header_flags = 0;
}
static void
gst_multi_fd_sink_finalize (GObject * object)
{
GstMultiFdSink *this;
GstMultiFdSink *this = GST_MULTI_FD_SINK (object);
this = GST_MULTI_FD_SINK (object);
CLIENTS_LOCK_CLEAR (this);
g_hash_table_destroy (this->fd_hash);
g_array_free (this->bufqueue, TRUE);
G_OBJECT_CLASS (parent_class)->finalize (object);
}
@ -638,9 +608,10 @@ static void
setup_dscp (GstMultiFdSink * sink)
{
GList *clients, *next;
GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
CLIENTS_LOCK (sink);
for (clients = sink->clients; clients; clients = next) {
for (clients = mhsink->clients; clients; clients = next) {
GstTCPClient *client;
client = (GstTCPClient *) clients->data;
@ -662,6 +633,7 @@ gst_multi_fd_sink_add_full (GstMultiFdSink * sink, int fd,
GList *clink;
gint flags;
struct stat statbuf;
GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
GST_DEBUG_OBJECT (sink, "[fd %5d] adding client, sync_method %d, "
"min_unit %d, min_value %" G_GUINT64_FORMAT
@ -678,6 +650,7 @@ gst_multi_fd_sink_add_full (GstMultiFdSink * sink, int fd,
client = g_new0 (GstTCPClient, 1);
mhclient = (GstMultiHandleClient *) client;
gst_multi_handle_sink_client_init (mhclient, sync_method);
g_snprintf (mhclient->debug, 30, "[fd %5d]", fd);
client->fd.fd = fd;
client->burst_min_unit = min_unit;
@ -693,9 +666,9 @@ gst_multi_fd_sink_add_full (GstMultiFdSink * sink, int fd,
goto duplicate;
/* we can add the fd now */
clink = sink->clients = g_list_prepend (sink->clients, client);
clink = mhsink->clients = g_list_prepend (mhsink->clients, client);
g_hash_table_insert (sink->fd_hash, &client->fd.fd, clink);
sink->clients_cookie++;
mhsink->clients_cookie++;
/* set the socket to non blocking */
if (fcntl (fd, F_SETFL, O_NONBLOCK) < 0) {
@ -766,6 +739,9 @@ void
gst_multi_fd_sink_remove (GstMultiFdSink * sink, int fd)
{
GList *clink;
GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
GstMultiHandleSinkClass *mhsinkclass =
GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
GST_DEBUG_OBJECT (sink, "[fd %5d] removing client", fd);
@ -783,7 +759,8 @@ gst_multi_fd_sink_remove (GstMultiFdSink * sink, int fd)
}
mhclient->status = GST_CLIENT_STATUS_REMOVED;
gst_multi_fd_sink_remove_client_link (sink, clink);
mhsinkclass->remove_client_link (GST_MULTI_HANDLE_SINK (sink), clink);
// FIXME: specific poll
gst_poll_restart (sink->fdset);
} else {
GST_WARNING_OBJECT (sink, "[fd %5d] no client with this fd found!", fd);
@ -828,39 +805,13 @@ done:
CLIENTS_UNLOCK (sink);
}
/* can be called both through the signal (i.e. from any thread) or when
* stopping, after the writing thread has shut down */
void
gst_multi_fd_sink_clear (GstMultiHandleSink * mhsink)
/* called with the CLIENTS_LOCK held */
static void
gst_multi_fd_sink_clear_post (GstMultiHandleSink * mhsink)
{
GList *clients, *next;
guint32 cookie;
GstMultiFdSink *sink = GST_MULTI_FD_SINK (mhsink);
GST_DEBUG_OBJECT (sink, "clearing all clients");
CLIENTS_LOCK (sink);
restart:
cookie = sink->clients_cookie;
for (clients = sink->clients; clients; clients = next) {
GstMultiHandleClient *mhclient;
if (cookie != sink->clients_cookie) {
GST_DEBUG_OBJECT (sink, "cookie changed while removing all clients");
goto restart;
}
mhclient = (GstMultiHandleClient *) clients->data;
next = g_list_next (clients);
mhclient->status = GST_CLIENT_STATUS_REMOVED;
/* the next call changes the list, which is why we iterate
* with a temporary next pointer */
gst_multi_fd_sink_remove_client_link (sink, clients);
}
gst_poll_restart (sink->fdset);
CLIENTS_UNLOCK (sink);
}
/* "get-stats" signal implementation
@ -955,12 +906,13 @@ noclient:
* close the fd itself.
*/
static void
gst_multi_fd_sink_remove_client_link (GstMultiFdSink * sink, GList * link)
gst_multi_fd_sink_remove_client_link (GstMultiHandleSink * sink, GList * link)
{
int fd;
GTimeVal now;
GstTCPClient *client = (GstTCPClient *) link->data;
GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
GstMultiFdSink *mfsink = GST_MULTI_FD_SINK (sink);
GstMultiFdSinkClass *fclass;
fclass = GST_MULTI_FD_SINK_GET_CLASS (sink);
@ -968,7 +920,8 @@ gst_multi_fd_sink_remove_client_link (GstMultiFdSink * sink, GList * link)
fd = client->fd.fd;
if (mhclient->currently_removing) {
GST_WARNING_OBJECT (sink, "[fd %5d] client is already being removed", fd);
GST_WARNING_OBJECT (sink, "%s client is already being removed",
mhclient->debug);
return;
} else {
mhclient->currently_removing = TRUE;
@ -977,34 +930,36 @@ gst_multi_fd_sink_remove_client_link (GstMultiFdSink * sink, GList * link)
/* FIXME: if we keep track of ip we can log it here and signal */
switch (mhclient->status) {
case GST_CLIENT_STATUS_OK:
GST_WARNING_OBJECT (sink, "[fd %5d] removing client %p for no reason",
fd, client);
GST_WARNING_OBJECT (sink, "%s removing client %p for no reason",
mhclient->debug, client);
break;
case GST_CLIENT_STATUS_CLOSED:
GST_DEBUG_OBJECT (sink, "[fd %5d] removing client %p because of close",
fd, client);
GST_DEBUG_OBJECT (sink, "%s removing client %p because of close",
mhclient->debug, client);
break;
case GST_CLIENT_STATUS_REMOVED:
GST_DEBUG_OBJECT (sink,
"[fd %5d] removing client %p because the app removed it", fd, client);
"%s removing client %p because the app removed it", mhclient->debug,
client);
break;
case GST_CLIENT_STATUS_SLOW:
GST_INFO_OBJECT (sink,
"[fd %5d] removing client %p because it was too slow", fd, client);
"%s removing client %p because it was too slow", mhclient->debug,
client);
break;
case GST_CLIENT_STATUS_ERROR:
GST_WARNING_OBJECT (sink,
"[fd %5d] removing client %p because of error", fd, client);
"%s removing client %p because of error", mhclient->debug, client);
break;
case GST_CLIENT_STATUS_FLUSHING:
default:
GST_WARNING_OBJECT (sink,
"[fd %5d] removing client %p with invalid reason %d", fd, client,
mhclient->status);
"%s removing client %p with invalid reason %d", mhclient->debug,
client, mhclient->status);
break;
}
gst_poll_remove_fd (sink->fdset, &client->fd);
gst_poll_remove_fd (mfsink->fdset, &client->fd);
g_get_current_time (&now);
mhclient->disconnect_time = GST_TIMEVAL_TO_TIME (now);
@ -1031,7 +986,7 @@ gst_multi_fd_sink_remove_client_link (GstMultiFdSink * sink, GList * link)
/* fd cannot be reused in the above signal callback so we can safely
* remove it from the hashtable here */
if (!g_hash_table_remove (sink->fd_hash, &client->fd.fd)) {
if (!g_hash_table_remove (mfsink->fd_hash, &client->fd.fd)) {
GST_WARNING_OBJECT (sink,
"[fd %5d] error removing client %p from hash", client->fd.fd, client);
}
@ -1044,7 +999,7 @@ gst_multi_fd_sink_remove_client_link (GstMultiFdSink * sink, GList * link)
sink->clients_cookie++;
if (fclass->removed)
fclass->removed (sink, client->fd.fd);
fclass->removed (mfsink, client->fd.fd);
g_free (client);
CLIENTS_UNLOCK (sink);
@ -1129,18 +1084,6 @@ ioctl_failed:
}
}
static gboolean
is_sync_frame (GstMultiFdSink * sink, GstBuffer * buffer)
{
if (GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DELTA_UNIT)) {
return FALSE;
} else if (!GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_IN_CAPS)) {
return TRUE;
}
return FALSE;
}
/* queue the given buffer for the given client */
static gboolean
gst_multi_fd_sink_client_queue_buffer (GstMultiFdSink * sink,
@ -1152,6 +1095,7 @@ gst_multi_fd_sink_client_queue_buffer (GstMultiFdSink * sink,
gboolean send_streamheader = FALSE;
GstStructure *s;
GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
/* before we queue the buffer, we check if we need to queue streamheader
* buffers (because it's a new client, or because they changed) */
@ -1186,7 +1130,7 @@ gst_multi_fd_sink_client_queue_buffer (GstMultiFdSink * sink,
send_streamheader = TRUE;
} else {
/* both old and new caps have streamheader set */
if (!sink->resend_streamheader) {
if (!mhsink->resend_streamheader) {
GST_DEBUG_OBJECT (sink,
"[fd %5d] asked to not resend the streamheader, not sending",
client->fd.fd);
@ -1259,40 +1203,6 @@ gst_multi_fd_sink_client_queue_buffer (GstMultiFdSink * sink,
return TRUE;
}
/* find the keyframe in the list of buffers starting the
* search from @idx. @direction as -1 will search backwards,
* 1 will search forwards.
* Returns: the index or -1 if there is no keyframe after idx.
*/
static gint
find_syncframe (GstMultiFdSink * sink, gint idx, gint direction)
{
gint i, len, result;
/* take length of queued buffers */
len = sink->bufqueue->len;
/* assume we don't find a keyframe */
result = -1;
/* then loop over all buffers to find the first keyframe */
for (i = idx; i >= 0 && i < len; i += direction) {
GstBuffer *buf;
buf = g_array_index (sink->bufqueue, GstBuffer *, i);
if (is_sync_frame (sink, buf)) {
GST_LOG_OBJECT (sink, "found keyframe at %d from %d, direction %d",
i, idx, direction);
result = i;
break;
}
}
return result;
}
#define find_next_syncframe(s,i) find_syncframe(s,i,1)
#define find_prev_syncframe(s,i) find_syncframe(s,i,-1)
/* Get the number of buffers from the buffer queue needed to satisfy
* the maximum max in the configured units.
* If units are not BUFFERS, and there are insufficient buffers in the
@ -1300,6 +1210,8 @@ find_syncframe (GstMultiFdSink * sink, gint idx, gint direction)
static gint
get_buffers_max (GstMultiFdSink * sink, gint64 max)
{
GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
switch (sink->unit_type) {
case GST_TCP_UNIT_TYPE_BUFFERS:
return max;
@ -1311,10 +1223,10 @@ get_buffers_max (GstMultiFdSink * sink, gint64 max)
gint64 diff;
GstClockTime first = GST_CLOCK_TIME_NONE;
len = sink->bufqueue->len;
len = mhsink->bufqueue->len;
for (i = 0; i < len; i++) {
buf = g_array_index (sink->bufqueue, GstBuffer *, i);
buf = g_array_index (mhsink->bufqueue, GstBuffer *, i);
if (GST_BUFFER_TIMESTAMP_IS_VALID (buf)) {
if (first == -1)
first = GST_BUFFER_TIMESTAMP (buf);
@ -1334,10 +1246,10 @@ get_buffers_max (GstMultiFdSink * sink, gint64 max)
int len;
gint acc = 0;
len = sink->bufqueue->len;
len = mhsink->bufqueue->len;
for (i = 0; i < len; i++) {
buf = g_array_index (sink->bufqueue, GstBuffer *, i);
buf = g_array_index (mhsink->bufqueue, GstBuffer *, i);
acc += gst_buffer_get_size (buf);
if (acc > max)
@ -1370,9 +1282,10 @@ find_limits (GstMultiFdSink * sink,
GstClockTime first, time;
gint i, len, bytes;
gboolean result, max_hit;
GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
/* take length of queue */
len = sink->bufqueue->len;
len = mhsink->bufqueue->len;
/* this must hold */
g_assert (len > 0);
@ -1419,7 +1332,7 @@ find_limits (GstMultiFdSink * sink,
result = *min_idx != -1;
break;
}
buf = g_array_index (sink->bufqueue, GstBuffer *, i);
buf = g_array_index (mhsink->bufqueue, GstBuffer *, i);
bytes += gst_buffer_get_size (buf);
@ -1531,11 +1444,12 @@ gst_multi_fd_sink_new_client (GstMultiFdSink * sink, GstTCPClient * client)
{
gint result;
GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
GST_DEBUG_OBJECT (sink,
"[fd %5d] new client, deciding where to start in queue", client->fd.fd);
GST_DEBUG_OBJECT (sink, "queue is currently %d buffers long",
sink->bufqueue->len);
mhsink->bufqueue->len);
switch (mhclient->sync_method) {
case GST_SYNC_METHOD_LATEST:
/* no syncing, we are happy with whatever the client is going to get */
@ -1551,7 +1465,7 @@ gst_multi_fd_sink_new_client (GstMultiFdSink * sink, GstTCPClient * client)
"[fd %5d] new client, bufpos %d, waiting for keyframe", client->fd.fd,
mhclient->bufpos);
result = find_prev_syncframe (sink, mhclient->bufpos);
result = find_prev_syncframe (mhsink, mhclient->bufpos);
if (result != -1) {
GST_DEBUG_OBJECT (sink,
"[fd %5d] SYNC_METHOD_NEXT_KEYFRAME: result %d",
@ -1577,7 +1491,7 @@ gst_multi_fd_sink_new_client (GstMultiFdSink * sink, GstTCPClient * client)
* we need to wait for the next keyframe and so we change the client's
* sync method to GST_SYNC_METHOD_NEXT_KEYFRAME.
*/
result = find_next_syncframe (sink, 0);
result = find_next_syncframe (mhsink, 0);
if (result != -1) {
GST_DEBUG_OBJECT (sink,
"[fd %5d] SYNC_METHOD_LATEST_KEYFRAME: result %d", client->fd.fd,
@ -1642,7 +1556,7 @@ gst_multi_fd_sink_new_client (GstMultiFdSink * sink, GstTCPClient * client)
GST_LOG_OBJECT (sink, "min %d, max %d", min_idx, max_idx);
/* first find a keyframe after min_idx */
next_syncframe = find_next_syncframe (sink, min_idx);
next_syncframe = find_next_syncframe (mhsink, min_idx);
if (next_syncframe != -1 && next_syncframe < max_idx) {
/* we have a valid keyframe and it's below the max */
GST_LOG_OBJECT (sink, "found keyframe in min/max limits");
@ -1651,7 +1565,7 @@ gst_multi_fd_sink_new_client (GstMultiFdSink * sink, GstTCPClient * client)
}
/* no valid keyframe, try to find one below min */
prev_syncframe = find_prev_syncframe (sink, min_idx);
prev_syncframe = find_prev_syncframe (mhsink, min_idx);
if (prev_syncframe != -1) {
GST_WARNING_OBJECT (sink,
"using keyframe below min in BURST_KEYFRAME sync mode");
@ -1689,7 +1603,7 @@ gst_multi_fd_sink_new_client (GstMultiFdSink * sink, GstTCPClient * client)
GST_LOG_OBJECT (sink, "min %d, max %d", min_idx, max_idx);
/* first find a keyframe after min_idx */
next_syncframe = find_next_syncframe (sink, min_idx);
next_syncframe = find_next_syncframe (mhsink, min_idx);
if (next_syncframe != -1 && next_syncframe < max_idx) {
/* we have a valid keyframe and it's below the max */
GST_LOG_OBJECT (sink, "found keyframe in min/max limits");
@ -1799,7 +1713,7 @@ gst_multi_fd_sink_handle_client_write (GstMultiFdSink * sink,
goto flushed;
/* grab buffer */
buf = g_array_index (sink->bufqueue, GstBuffer *, mhclient->bufpos);
buf = g_array_index (mhsink->bufqueue, GstBuffer *, mhclient->bufpos);
mhclient->bufpos--;
/* update stats */
@ -1941,14 +1855,14 @@ gst_multi_fd_sink_recover_client (GstMultiFdSink * sink, GstTCPClient * client)
case GST_RECOVER_POLICY_RESYNC_KEYFRAME:
/* find keyframe in buffers, we search backwards to find the
* closest keyframe relative to what this client already received. */
newbufpos = MIN (sink->bufqueue->len - 1,
newbufpos = MIN (mhsink->bufqueue->len - 1,
get_buffers_max (sink, sink->units_soft_max) - 1);
while (newbufpos >= 0) {
GstBuffer *buf;
buf = g_array_index (sink->bufqueue, GstBuffer *, newbufpos);
if (is_sync_frame (sink, buf)) {
buf = g_array_index (mhsink->bufqueue, GstBuffer *, newbufpos);
if (is_sync_frame (mhsink, buf)) {
/* found a buffer that is not a delta unit */
break;
}
@ -1993,17 +1907,17 @@ gst_multi_fd_sink_queue_buffer (GstMultiFdSink * sink, GstBuffer * buf)
GstClockTime now;
gint max_buffers, soft_max_buffers;
guint cookie;
GstMultiHandleSink *mhsink;
mhsink = GST_MULTI_HANDLE_SINK (sink);
GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
GstMultiHandleSinkClass *mhsinkclass =
GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
g_get_current_time (&nowtv);
now = GST_TIMEVAL_TO_TIME (nowtv);
CLIENTS_LOCK (sink);
/* add buffer to queue */
g_array_prepend_val (sink->bufqueue, buf);
queuelen = sink->bufqueue->len;
g_array_prepend_val (mhsink->bufqueue, buf);
queuelen = mhsink->bufqueue->len;
if (sink->units_max > 0)
max_buffers = get_buffers_max (sink, sink->units_max);
@ -2021,12 +1935,12 @@ gst_multi_fd_sink_queue_buffer (GstMultiFdSink * sink, GstBuffer * buf)
max_buffer_usage = 0;
restart:
cookie = sink->clients_cookie;
for (clients = sink->clients; clients; clients = next) {
cookie = mhsink->clients_cookie;
for (clients = mhsink->clients; clients; clients = next) {
GstTCPClient *client;
GstMultiHandleClient *mhclient;
if (cookie != sink->clients_cookie) {
if (cookie != mhsink->clients_cookie) {
GST_DEBUG_OBJECT (sink, "Clients cookie outdated, restarting");
goto restart;
}
@ -2067,7 +1981,7 @@ restart:
mhclient->status = GST_CLIENT_STATUS_SLOW;
/* set client to invalid position while being removed */
mhclient->bufpos = -1;
gst_multi_fd_sink_remove_client_link (sink, clients);
mhsinkclass->remove_client_link (mhsink, clients);
need_signal = TRUE;
continue;
} else if (mhclient->bufpos == 0 || mhclient->new_connection) {
@ -2119,8 +2033,8 @@ restart:
"extending queue to include sync point, now at %d, limit is %d",
max_buffer_usage, limit);
for (i = 0; i < limit; i++) {
buf = g_array_index (sink->bufqueue, GstBuffer *, i);
if (is_sync_frame (sink, buf)) {
buf = g_array_index (mhsink->bufqueue, GstBuffer *, i);
if (is_sync_frame (mhsink, buf)) {
/* found a sync frame, now extend the buffer usage to
* include at least this frame. */
max_buffer_usage = MAX (max_buffer_usage, i);
@ -2140,14 +2054,14 @@ restart:
/* queue exceeded max size */
queuelen--;
old = g_array_index (sink->bufqueue, GstBuffer *, i);
sink->bufqueue = g_array_remove_index (sink->bufqueue, i);
old = g_array_index (mhsink->bufqueue, GstBuffer *, i);
mhsink->bufqueue = g_array_remove_index (mhsink->bufqueue, i);
/* unref tail buffer */
gst_buffer_unref (old);
}
/* save for stats */
sink->buffers_queued = max_buffer_usage;
mhsink->buffers_queued = max_buffer_usage;
CLIENTS_UNLOCK (sink);
/* and send a signal to thread if fd_set changed */
@ -2172,9 +2086,10 @@ gst_multi_fd_sink_handle_clients (GstMultiFdSink * sink)
gboolean try_again;
GstMultiFdSinkClass *fclass;
guint cookie;
GstMultiHandleSink *mhsink;
GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
GstMultiHandleSinkClass *mhsinkclass =
GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
mhsink = GST_MULTI_HANDLE_SINK (sink);
fclass = GST_MULTI_FD_SINK_GET_CLASS (sink);
@ -2201,7 +2116,7 @@ gst_multi_fd_sink_handle_clients (GstMultiFdSink * sink)
now = GST_TIMEVAL_TO_TIME (nowtv);
CLIENTS_LOCK (sink);
for (clients = sink->clients; clients; clients = next) {
for (clients = mhsink->clients; clients; clients = next) {
GstTCPClient *client;
GstMultiHandleClient *mhclient;
@ -2211,7 +2126,7 @@ gst_multi_fd_sink_handle_clients (GstMultiFdSink * sink)
if (mhsink->timeout > 0
&& now - mhclient->last_activity_time > mhsink->timeout) {
mhclient->status = GST_CLIENT_STATUS_SLOW;
gst_multi_fd_sink_remove_client_link (sink, clients);
mhsinkclass->remove_client_link (mhsink, clients);
}
}
CLIENTS_UNLOCK (sink);
@ -2224,15 +2139,15 @@ gst_multi_fd_sink_handle_clients (GstMultiFdSink * sink)
* the ones that give an error to the F_GETFL fcntl. */
CLIENTS_LOCK (sink);
restart:
cookie = sink->clients_cookie;
for (clients = sink->clients; clients; clients = next) {
cookie = mhsink->clients_cookie;
for (clients = mhsink->clients; clients; clients = next) {
GstTCPClient *client;
GstMultiHandleClient *mhclient;
int fd;
long flags;
int res;
if (cookie != sink->clients_cookie) {
if (cookie != mhsink->clients_cookie) {
GST_DEBUG_OBJECT (sink, "Cookie changed finding bad fd");
goto restart;
}
@ -2250,7 +2165,7 @@ gst_multi_fd_sink_handle_clients (GstMultiFdSink * sink)
if (errno == EBADF) {
mhclient->status = GST_CLIENT_STATUS_ERROR;
/* releases the CLIENTS lock */
gst_multi_fd_sink_remove_client_link (sink, clients);
mhsinkclass->remove_client_link (mhsink, clients);
}
}
}
@ -2283,12 +2198,12 @@ gst_multi_fd_sink_handle_clients (GstMultiFdSink * sink)
CLIENTS_LOCK (sink);
restart2:
cookie = sink->clients_cookie;
for (clients = sink->clients; clients; clients = next) {
cookie = mhsink->clients_cookie;
for (clients = mhsink->clients; clients; clients = next) {
GstTCPClient *client;
GstMultiHandleClient *mhclient;
if (sink->clients_cookie != cookie) {
if (mhsink->clients_cookie != cookie) {
GST_DEBUG_OBJECT (sink, "Restarting loop, cookie out of date");
goto restart2;
}
@ -2299,32 +2214,32 @@ restart2:
if (mhclient->status != GST_CLIENT_STATUS_FLUSHING
&& mhclient->status != GST_CLIENT_STATUS_OK) {
gst_multi_fd_sink_remove_client_link (sink, clients);
mhsinkclass->remove_client_link (mhsink, clients);
continue;
}
if (gst_poll_fd_has_closed (sink->fdset, &client->fd)) {
mhclient->status = GST_CLIENT_STATUS_CLOSED;
gst_multi_fd_sink_remove_client_link (sink, clients);
mhsinkclass->remove_client_link (mhsink, clients);
continue;
}
if (gst_poll_fd_has_error (sink->fdset, &client->fd)) {
GST_WARNING_OBJECT (sink, "gst_poll_fd_has_error for %d", client->fd.fd);
mhclient->status = GST_CLIENT_STATUS_ERROR;
gst_multi_fd_sink_remove_client_link (sink, clients);
mhsinkclass->remove_client_link (mhsink, clients);
continue;
}
if (gst_poll_fd_can_read (sink->fdset, &client->fd)) {
/* handle client read */
if (!gst_multi_fd_sink_handle_client_read (sink, client)) {
gst_multi_fd_sink_remove_client_link (sink, clients);
mhsinkclass->remove_client_link (mhsink, clients);
continue;
}
}
if (gst_poll_fd_can_write (sink->fdset, &client->fd)) {
/* handle client write */
if (!gst_multi_fd_sink_handle_client_write (sink, client)) {
gst_multi_fd_sink_remove_client_link (sink, clients);
mhsinkclass->remove_client_link (mhsink, clients);
continue;
}
}
@ -2335,9 +2250,11 @@ restart2:
/* we handle the client communication in another thread so that we do not block
* the gstreamer thread while we select() on the client fds */
static gpointer
gst_multi_fd_sink_thread (GstMultiFdSink * sink)
gst_multi_fd_sink_thread (GstMultiHandleSink * mhsink)
{
while (sink->running) {
GstMultiFdSink *sink = GST_MULTI_FD_SINK (mhsink);
while (mhsink->running) {
gst_multi_fd_sink_handle_clients (sink);
}
return NULL;
@ -2370,8 +2287,8 @@ gst_multi_fd_sink_render (GstBaseSink * bsink, GstBuffer * buf)
goto no_caps;
#endif
/* get IN_CAPS first, code below might mess with the flags */
in_caps = GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_IN_CAPS);
/* get HEADER first, code below might mess with the flags */
in_caps = GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_HEADER);
#if 0
/* stamp the buffer with previous caps if no caps set */
@ -2408,15 +2325,15 @@ gst_multi_fd_sink_render (GstBaseSink * bsink, GstBuffer * buf)
GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf)),
GST_TIME_ARGS (GST_BUFFER_DURATION (buf)));
/* if we get IN_CAPS buffers, but the previous buffer was not IN_CAPS,
/* if we get HEADER buffers, but the previous buffer was not HEADER,
* it means we're getting new streamheader buffers, and we should clear
* the old ones */
if (in_caps && sink->previous_buffer_in_caps == FALSE) {
GST_DEBUG_OBJECT (sink,
"receiving new IN_CAPS buffers, clearing old streamheader");
g_slist_foreach (sink->streamheader, (GFunc) gst_mini_object_unref, NULL);
g_slist_free (sink->streamheader);
sink->streamheader = NULL;
"receiving new HEADER buffers, clearing old streamheader");
g_slist_foreach (mhsink->streamheader, (GFunc) gst_mini_object_unref, NULL);
g_slist_free (mhsink->streamheader);
mhsink->streamheader = NULL;
}
/* save the current in_caps */
@ -2431,9 +2348,9 @@ gst_multi_fd_sink_render (GstBaseSink * bsink, GstBuffer * buf)
* We don't send the buffer to the client, since streamheaders are sent
* separately when necessary. */
if (in_caps) {
GST_DEBUG_OBJECT (sink, "appending IN_CAPS buffer with length %"
GST_DEBUG_OBJECT (sink, "appending HEADER buffer with length %"
G_GSIZE_FORMAT " to streamheader", gst_buffer_get_size (buf));
sink->streamheader = g_slist_append (sink->streamheader, buf);
mhsink->streamheader = g_slist_append (mhsink->streamheader, buf);
} else {
/* queue the buffer, this is a regular data buffer. */
gst_multi_fd_sink_queue_buffer (sink, buf);
@ -2493,9 +2410,6 @@ gst_multi_fd_sink_set_property (GObject * object, guint prop_id,
case PROP_HANDLE_READ:
multifdsink->handle_read = g_value_get_boolean (value);
break;
case PROP_RESEND_STREAMHEADER:
multifdsink->resend_streamheader = g_value_get_boolean (value);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
@ -2521,15 +2435,6 @@ gst_multi_fd_sink_get_property (GObject * object, guint prop_id, GValue * value,
case PROP_BUFFERS_SOFT_MAX:
g_value_set_int (value, multifdsink->units_soft_max);
break;
case PROP_BUFFERS_QUEUED:
g_value_set_uint (value, multifdsink->buffers_queued);
break;
case PROP_BYTES_QUEUED:
g_value_set_uint (value, multifdsink->bytes_queued);
break;
case PROP_TIME_QUEUED:
g_value_set_uint64 (value, multifdsink->time_queued);
break;
case PROP_UNIT_TYPE:
g_value_set_enum (value, multifdsink->unit_type);
break;
@ -2551,9 +2456,6 @@ gst_multi_fd_sink_get_property (GObject * object, guint prop_id, GValue * value,
case PROP_HANDLE_READ:
g_value_set_boolean (value, multifdsink->handle_read);
break;
case PROP_RESEND_STREAMHEADER:
g_value_set_boolean (value, multifdsink->resend_streamheader);
break;
case PROP_NUM_FDS:
g_value_set_uint (value, g_hash_table_size (multifdsink->fd_hash));
break;
@ -2564,52 +2466,21 @@ gst_multi_fd_sink_get_property (GObject * object, guint prop_id, GValue * value,
}
}
/* create a socket for sending to remote machine */
static gboolean
gst_multi_fd_sink_start (GstBaseSink * bsink)
gst_multi_fd_sink_start_pre (GstMultiHandleSink * mhsink)
{
GstMultiFdSinkClass *fclass;
GstMultiFdSink *this;
GstMultiHandleSink *mhsink;
GstMultiFdSink *mfsink = GST_MULTI_FD_SINK (mhsink);
if (GST_OBJECT_FLAG_IS_SET (bsink, GST_MULTI_HANDLE_SINK_OPEN))
return TRUE;
this = GST_MULTI_FD_SINK (bsink);
mhsink = GST_MULTI_HANDLE_SINK (bsink);
fclass = GST_MULTI_FD_SINK_GET_CLASS (this);
GST_INFO_OBJECT (this, "starting in mode %d", this->mode);
if ((this->fdset = gst_poll_new (TRUE)) == NULL)
GST_INFO_OBJECT (mfsink, "starting in mode %d", mfsink->mode);
if ((mfsink->fdset = gst_poll_new (TRUE)) == NULL)
goto socket_pair;
this->streamheader = NULL;
mhsink->bytes_to_serve = 0;
mhsink->bytes_served = 0;
if (fclass->init) {
fclass->init (this);
}
this->running = TRUE;
#if !GLIB_CHECK_VERSION (2, 31, 0)
this->thread = g_thread_create ((GThreadFunc) gst_multi_fd_sink_thread,
this, TRUE, NULL);
#else
this->thread = g_thread_new ("multifdsink",
(GThreadFunc) gst_multi_fd_sink_thread, this);
#endif
GST_OBJECT_FLAG_SET (this, GST_MULTI_HANDLE_SINK_OPEN);
return TRUE;
/* ERRORS */
socket_pair:
{
GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ_WRITE, (NULL),
GST_ELEMENT_ERROR (mfsink, RESOURCE, OPEN_READ_WRITE, (NULL),
GST_ERROR_SYSTEM);
return FALSE;
}
@ -2621,113 +2492,24 @@ multifdsink_hash_remove (gpointer key, gpointer value, gpointer data)
return TRUE;
}
static gboolean
gst_multi_fd_sink_stop (GstBaseSink * bsink)
static void
gst_multi_fd_sink_stop_pre (GstMultiHandleSink * mhsink)
{
GstMultiFdSinkClass *fclass;
GstMultiHandleSinkClass *mhclass;
GstMultiFdSink *this;
GstBuffer *buf;
int i;
GstMultiFdSink *mfsink = GST_MULTI_FD_SINK (mhsink);
this = GST_MULTI_FD_SINK (bsink);
fclass = GST_MULTI_FD_SINK_GET_CLASS (this);
mhclass = GST_MULTI_HANDLE_SINK_GET_CLASS (this);
gst_poll_set_flushing (mfsink->fdset, TRUE);
if (!GST_OBJECT_FLAG_IS_SET (bsink, GST_MULTI_HANDLE_SINK_OPEN))
return TRUE;
this->running = FALSE;
gst_poll_set_flushing (this->fdset, TRUE);
if (this->thread) {
GST_DEBUG_OBJECT (this, "joining thread");
g_thread_join (this->thread);
GST_DEBUG_OBJECT (this, "joined thread");
this->thread = NULL;
}
/* free the clients */
mhclass->clear (GST_MULTI_HANDLE_SINK (this));
if (this->streamheader) {
g_slist_foreach (this->streamheader, (GFunc) gst_mini_object_unref, NULL);
g_slist_free (this->streamheader);
this->streamheader = NULL;
}
if (fclass->close)
fclass->close (this);
if (this->fdset) {
gst_poll_free (this->fdset);
this->fdset = NULL;
}
g_hash_table_foreach_remove (this->fd_hash, multifdsink_hash_remove, this);
/* remove all queued buffers */
if (this->bufqueue) {
GST_DEBUG_OBJECT (this, "Emptying bufqueue with %d buffers",
this->bufqueue->len);
for (i = this->bufqueue->len - 1; i >= 0; --i) {
buf = g_array_index (this->bufqueue, GstBuffer *, i);
GST_LOG_OBJECT (this, "Removing buffer %p (%d) with refcount %d", buf, i,
GST_MINI_OBJECT_REFCOUNT (buf));
gst_buffer_unref (buf);
this->bufqueue = g_array_remove_index (this->bufqueue, i);
}
/* freeing the array is done in _finalize */
}
GST_OBJECT_FLAG_UNSET (this, GST_MULTI_HANDLE_SINK_OPEN);
return TRUE;
}
static GstStateChangeReturn
gst_multi_fd_sink_change_state (GstElement * element, GstStateChange transition)
static void
gst_multi_fd_sink_stop_post (GstMultiHandleSink * mhsink)
{
GstMultiFdSink *sink;
GstStateChangeReturn ret;
GstMultiFdSink *mfsink = GST_MULTI_FD_SINK (mhsink);
sink = GST_MULTI_FD_SINK (element);
/* we disallow changing the state from the streaming thread */
if (g_thread_self () == sink->thread)
return GST_STATE_CHANGE_FAILURE;
switch (transition) {
case GST_STATE_CHANGE_NULL_TO_READY:
if (!gst_multi_fd_sink_start (GST_BASE_SINK (sink)))
goto start_failed;
break;
case GST_STATE_CHANGE_READY_TO_PAUSED:
break;
case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
break;
default:
break;
}
ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
switch (transition) {
case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
break;
case GST_STATE_CHANGE_PAUSED_TO_READY:
break;
case GST_STATE_CHANGE_READY_TO_NULL:
gst_multi_fd_sink_stop (GST_BASE_SINK (sink));
break;
default:
break;
}
return ret;
/* ERRORS */
start_failed:
{
/* error message was posted */
return GST_STATE_CHANGE_FAILURE;
if (mfsink->fdset) {
gst_poll_free (mfsink->fdset);
mfsink->fdset = NULL;
}
g_hash_table_foreach_remove (mfsink->fd_hash, multifdsink_hash_remove,
mfsink);
}

View file

@ -92,26 +92,17 @@ struct _GstMultiFdSink {
GstMultiHandleSink element;
/*< private >*/
GRecMutex clientslock; /* lock to protect the clients list */
GList *clients; /* list of clients we are serving */
GHashTable *fd_hash; /* index on fd to client */
guint clients_cookie; /* Cookie to detect changes to the clients list */
gint mode;
GstPoll *fdset;
GSList *streamheader; /* GSList of GstBuffers to use as streamheader */
gboolean previous_buffer_in_caps;
guint mtu;
gint qos_dscp;
gboolean handle_read;
GArray *bufqueue; /* global queue of buffers */
gboolean running; /* the thread state */
GThread *thread; /* the sender thread */
/* these values are used to check if a client is reading fast
* enough and to control receovery */
GstTCPUnitType unit_type;/* the type of the units */
@ -121,13 +112,6 @@ struct _GstMultiFdSink {
GstTCPUnitType def_burst_unit;
guint64 def_burst_value;
gboolean resend_streamheader; /* resend streamheader if it changes */
/* stats */
gint buffers_queued; /* number of queued buffers */
gint bytes_queued; /* number of queued bytes */
gint time_queued; /* number of queued time */
guint8 header_flags;
};
@ -145,9 +129,7 @@ struct _GstMultiFdSinkClass {
GValueArray* (*get_stats) (GstMultiFdSink *sink, int fd);
/* vtable */
gboolean (*init) (GstMultiFdSink *sink);
gboolean (*wait) (GstMultiFdSink *sink, GstPoll *set);
gboolean (*close) (GstMultiFdSink *sink);
void (*removed) (GstMultiFdSink *sink, int fd);
/* signals */

View file

@ -161,7 +161,6 @@ enum
#define DEFAULT_BURST_VALUE 0
#define DEFAULT_QOS_DSCP -1
#define DEFAULT_HANDLE_READ TRUE
#define DEFAULT_RESEND_STREAMHEADER TRUE
@ -170,10 +169,12 @@ enum
PROP_0,
#if 0
PROP_MODE,
#endif
PROP_BUFFERS_QUEUED,
PROP_BYTES_QUEUED,
PROP_TIME_QUEUED,
#if 0
PROP_UNIT_TYPE,
PROP_UNITS_MAX,
PROP_UNITS_SOFT_MAX,
@ -197,11 +198,11 @@ enum
PROP_BURST_VALUE,
PROP_QOS_DSCP,
PROP_HANDLE_READ,
#endif
PROP_RESEND_STREAMHEADER,
#if 0
PROP_NUM_SOCKETS,
#endif
@ -293,9 +294,8 @@ gst_multi_handle_sink_client_status_get_type (void)
return client_status_type;
}
#if 0
static void gst_multi_handle_sink_finalize (GObject * object);
#endif
static void gst_multi_handle_sink_clear (GstMultiHandleSink * mhsink);
#if 0
static void gst_multi_handle_sink_remove_client_link (GstMultiHandleSink * sink,
@ -309,9 +309,9 @@ static GstFlowReturn gst_multi_handle_sink_render (GstBaseSink * bsink,
GstBuffer * buf);
static gboolean gst_multi_handle_sink_unlock (GstBaseSink * bsink);
static gboolean gst_multi_handle_sink_unlock_stop (GstBaseSink * bsink);
#endif
static GstStateChangeReturn gst_multi_handle_sink_change_state (GstElement *
element, GstStateChange transition);
#endif
static void gst_multi_handle_sink_set_property (GObject * object, guint prop_id,
const GValue * value, GParamSpec * pspec);
@ -340,9 +340,7 @@ gst_multi_handle_sink_class_init (GstMultiHandleSinkClass * klass)
gobject_class->set_property = gst_multi_handle_sink_set_property;
gobject_class->get_property = gst_multi_handle_sink_get_property;
#if 0
gobject_class->finalize = gst_multi_handle_sink_finalize;
#endif
#if 0
g_object_class_install_property (gobject_class, PROP_BUFFERS_MAX,
@ -390,6 +388,7 @@ gst_multi_handle_sink_class_init (GstMultiHandleSinkClass * klass)
"Recover client when going over this limit (-1 = no limit)", -1,
G_MAXINT64, DEFAULT_UNITS_SOFT_MAX,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
#endif
g_object_class_install_property (gobject_class, PROP_BUFFERS_QUEUED,
g_param_spec_uint ("buffers-queued", "Buffers queued",
@ -404,7 +403,6 @@ gst_multi_handle_sink_class_init (GstMultiHandleSinkClass * klass)
g_param_spec_uint64 ("time-queued", "Time queued",
"Number of time currently queued", 0, G_MAXUINT64, 0,
G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
#endif
#endif
g_object_class_install_property (gobject_class, PROP_RECOVER_POLICY,
@ -458,6 +456,7 @@ gst_multi_handle_sink_class_init (GstMultiHandleSinkClass * klass)
g_param_spec_boolean ("handle-read", "Handle Read",
"Handle client reads and discard the data",
DEFAULT_HANDLE_READ, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
#endif
/**
* GstMultiHandleSink::resend-streamheader
*
@ -471,13 +470,12 @@ gst_multi_handle_sink_class_init (GstMultiHandleSinkClass * klass)
DEFAULT_RESEND_STREAMHEADER,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
#if 0
g_object_class_install_property (gobject_class, PROP_NUM_SOCKETS,
g_param_spec_uint ("num-sockets", "Number of sockets",
"The current number of client sockets",
0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
#endif
#if 0
/**
* GstMultiHandleSink::add:
* @gstmultihandlesink: the multihandlesink element to emit this signal on
@ -638,9 +636,9 @@ gst_multi_handle_sink_class_init (GstMultiHandleSinkClass * klass)
"Wim Taymans <wim@fluendo.com>, "
"Sebastian Dröge <sebastian.droege@collabora.co.uk>");
#if 0
gstelement_class->change_state =
GST_DEBUG_FUNCPTR (gst_multi_handle_sink_change_state);
#if 0
gstbasesink_class->render = GST_DEBUG_FUNCPTR (gst_multi_handle_sink_render);
gstbasesink_class->unlock = GST_DEBUG_FUNCPTR (gst_multi_handle_sink_unlock);
@ -653,7 +651,9 @@ gst_multi_handle_sink_class_init (GstMultiHandleSinkClass * klass)
klass->add_full = GST_DEBUG_FUNCPTR (gst_multi_handle_sink_add_full);
klass->remove = GST_DEBUG_FUNCPTR (gst_multi_handle_sink_remove);
klass->remove_flush = GST_DEBUG_FUNCPTR (gst_multi_handle_sink_remove_flush);
#endif
klass->clear = GST_DEBUG_FUNCPTR (gst_multi_handle_sink_clear);
#if 0
klass->get_stats = GST_DEBUG_FUNCPTR (gst_multi_handle_sink_get_stats);
#endif
@ -668,7 +668,9 @@ gst_multi_handle_sink_init (GstMultiHandleSink * this)
CLIENTS_LOCK_INIT (this);
this->clients = NULL;
#if 0
this->socket_hash = g_hash_table_new (g_direct_hash, g_int_equal);
#endif
this->bufqueue = g_array_new (FALSE, TRUE, sizeof (GstBuffer *));
this->unit_type = DEFAULT_UNIT_TYPE;
@ -685,7 +687,6 @@ gst_multi_handle_sink_init (GstMultiHandleSink * this)
this->def_burst_value = DEFAULT_BURST_VALUE;
this->qos_dscp = DEFAULT_QOS_DSCP;
this->handle_read = DEFAULT_HANDLE_READ;
this->resend_streamheader = DEFAULT_RESEND_STREAMHEADER;
@ -693,7 +694,6 @@ gst_multi_handle_sink_init (GstMultiHandleSink * this)
this->cancellable = g_cancellable_new ();
}
#if 0
static void
gst_multi_handle_sink_finalize (GObject * object)
{
@ -702,17 +702,12 @@ gst_multi_handle_sink_finalize (GObject * object)
this = GST_MULTI_HANDLE_SINK (object);
CLIENTS_LOCK_CLEAR (this);
g_hash_table_destroy (this->socket_hash);
g_array_free (this->bufqueue, TRUE);
if (this->cancellable) {
g_object_unref (this->cancellable);
this->cancellable = NULL;
}
G_OBJECT_CLASS (parent_class)->finalize (object);
}
#if 0
static gint
setup_dscp_client (GstMultiHandleSink * sink, GstSocketClient * client)
{
@ -985,36 +980,46 @@ gst_multi_handle_sink_remove_flush (GstMultiHandleSink * sink, GSocket * socket)
done:
CLIENTS_UNLOCK (sink);
}
#endif
/* can be called both through the signal (i.e. from any thread) or when
* stopping, after the writing thread has shut down */
void
gst_multi_handle_sink_clear (GstMultiHandleSink * sink)
static void
gst_multi_handle_sink_clear (GstMultiHandleSink * mhsink)
{
GList *clients;
GList *clients, *next;
guint32 cookie;
GstMultiHandleSinkClass *mhsinkclass =
GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
GST_DEBUG_OBJECT (sink, "clearing all clients");
GST_DEBUG_OBJECT (mhsink, "clearing all clients");
CLIENTS_LOCK (sink);
CLIENTS_LOCK (mhsink);
restart:
cookie = sink->clients_cookie;
for (clients = sink->clients; clients; clients = clients->next) {
GstSocketClient *client;
cookie = mhsink->clients_cookie;
for (clients = mhsink->clients; clients; clients = next) {
GstMultiHandleClient *mhclient;
if (cookie != sink->clients_cookie) {
GST_DEBUG_OBJECT (sink, "cookie changed while removing all clients");
if (cookie != mhsink->clients_cookie) {
GST_DEBUG_OBJECT (mhsink, "cookie changed while removing all clients");
goto restart;
}
client = clients->data;
client->status = GST_CLIENT_STATUS_REMOVED;
gst_multi_handle_sink_remove_client_link (sink, clients);
}
mhclient = (GstMultiHandleClient *) clients->data;
next = g_list_next (clients);
CLIENTS_UNLOCK (sink);
mhclient->status = GST_CLIENT_STATUS_REMOVED;
/* the next call changes the list, which is why we iterate
* with a temporary next pointer */
mhsinkclass->remove_client_link (mhsink, clients);
}
if (mhsinkclass->clear_post)
mhsinkclass->clear_post (mhsink);
CLIENTS_UNLOCK (mhsink);
}
#if 0
/* "get-stats" signal implementation
*/
GstStructure *
@ -1363,13 +1368,13 @@ gst_multi_handle_sink_client_queue_buffer (GstMultiHandleSink * sink,
}
#endif
#if 0
static gboolean
// FIXME: privatize again ?
gboolean
is_sync_frame (GstMultiHandleSink * sink, GstBuffer * buffer)
{
if (GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DELTA_UNIT)) {
return FALSE;
} else if (!GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_IN_CAPS)) {
} else if (!GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_HEADER)) {
return TRUE;
}
@ -1381,7 +1386,7 @@ is_sync_frame (GstMultiHandleSink * sink, GstBuffer * buffer)
* 1 will search forwards.
* Returns: the index or -1 if there is no keyframe after idx.
*/
static gint
gint
find_syncframe (GstMultiHandleSink * sink, gint idx, gint direction)
{
gint i, len, result;
@ -1406,10 +1411,7 @@ find_syncframe (GstMultiHandleSink * sink, gint idx, gint direction)
}
return result;
}
#endif
#define find_next_syncframe(s,i) find_syncframe(s,i,1)
#define find_prev_syncframe(s,i) find_syncframe(s,i,-1)
#if 0
/* Get the number of buffers from the buffer queue needed to satisfy
@ -2573,9 +2575,6 @@ gst_multi_handle_sink_set_property (GObject * object, guint prop_id,
multihandlesink->qos_dscp = g_value_get_int (value);
setup_dscp (multihandlesink);
break;
case PROP_HANDLE_READ:
multihandlesink->handle_read = g_value_get_boolean (value);
break;
case PROP_RESEND_STREAMHEADER:
multihandlesink->resend_streamheader = g_value_get_boolean (value);
break;
@ -2612,7 +2611,6 @@ gst_multi_handle_sink_get_property (GObject * object, guint prop_id,
case PROP_BUFFERS_MIN:
g_value_set_int (value, multihandlesink->buffers_min);
break;
#if 0
case PROP_BUFFERS_QUEUED:
g_value_set_uint (value, multihandlesink->buffers_queued);
break;
@ -2622,6 +2620,7 @@ gst_multi_handle_sink_get_property (GObject * object, guint prop_id,
case PROP_TIME_QUEUED:
g_value_set_uint64 (value, multihandlesink->time_queued);
break;
#if 0
case PROP_UNIT_TYPE:
g_value_set_enum (value, multihandlesink->unit_type);
break;
@ -2657,9 +2656,6 @@ gst_multi_handle_sink_get_property (GObject * object, guint prop_id,
case PROP_QOS_DSCP:
g_value_set_int (value, multihandlesink->qos_dscp);
break;
case PROP_HANDLE_READ:
g_value_set_boolean (value, multihandlesink->handle_read);
break;
case PROP_RESEND_STREAMHEADER:
g_value_set_boolean (value, multihandlesink->resend_streamheader);
break;
@ -2674,6 +2670,39 @@ gst_multi_handle_sink_get_property (GObject * object, guint prop_id,
}
}
/* create a socket for sending to remote machine */
gboolean
gst_multi_handle_sink_start (GstBaseSink * bsink)
{
GstMultiHandleSinkClass *mhsclass;
GstMultiHandleSink *mhsink;
if (GST_OBJECT_FLAG_IS_SET (bsink, GST_MULTI_HANDLE_SINK_OPEN))
return TRUE;
mhsink = GST_MULTI_HANDLE_SINK (bsink);
mhsclass = GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
if (!mhsclass->start_pre (mhsink))
return FALSE;
mhsink->streamheader = NULL;
mhsink->bytes_to_serve = 0;
mhsink->bytes_served = 0;
if (mhsclass->init) {
mhsclass->init (mhsink);
}
mhsink->running = TRUE;
mhsink->thread = g_thread_new ("multihandlesink",
(GThreadFunc) mhsclass->thread, mhsink);
GST_OBJECT_FLAG_SET (bsink, GST_MULTI_HANDLE_SINK_OPEN);
return TRUE;
}
#if 0
/* create a socket for sending to remote machine */
@ -2736,6 +2765,64 @@ multisocketsink_hash_remove (gpointer key, gpointer value, gpointer data)
}
#endif
// FIXME: privatize again
gboolean
gst_multi_handle_sink_stop (GstBaseSink * bsink)
{
GstMultiHandleSinkClass *mhclass;
GstBuffer *buf;
gint i;
GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (bsink);
mhclass = GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
if (!GST_OBJECT_FLAG_IS_SET (bsink, GST_MULTI_HANDLE_SINK_OPEN))
return TRUE;
mhsink->running = FALSE;
mhclass->stop_pre (mhsink);
if (mhsink->thread) {
GST_DEBUG_OBJECT (mhsink, "joining thread");
g_thread_join (mhsink->thread);
GST_DEBUG_OBJECT (mhsink, "joined thread");
mhsink->thread = NULL;
}
/* free the clients */
mhclass->clear (GST_MULTI_HANDLE_SINK (mhsink));
if (mhsink->streamheader) {
g_slist_foreach (mhsink->streamheader, (GFunc) gst_mini_object_unref, NULL);
g_slist_free (mhsink->streamheader);
mhsink->streamheader = NULL;
}
if (mhclass->close)
mhclass->close (mhsink);
mhclass->stop_post (mhsink);
/* remove all queued buffers */
if (mhsink->bufqueue) {
GST_DEBUG_OBJECT (mhsink, "Emptying bufqueue with %d buffers",
mhsink->bufqueue->len);
for (i = mhsink->bufqueue->len - 1; i >= 0; --i) {
buf = g_array_index (mhsink->bufqueue, GstBuffer *, i);
GST_LOG_OBJECT (mhsink, "Removing buffer %p (%d) with refcount %d", buf,
i, GST_MINI_OBJECT_REFCOUNT (buf));
gst_buffer_unref (buf);
mhsink->bufqueue = g_array_remove_index (mhsink->bufqueue, i);
}
/* freeing the array is done in _finalize */
}
GST_OBJECT_FLAG_UNSET (mhsink, GST_MULTI_HANDLE_SINK_OPEN);
return TRUE;
}
#if 0
static gboolean
gst_multi_handle_sink_stop (GstBaseSink * bsink)
@ -2802,7 +2889,6 @@ gst_multi_handle_sink_stop (GstBaseSink * bsink)
}
#endif
#if 0
static GstStateChangeReturn
gst_multi_handle_sink_change_state (GstElement * element,
GstStateChange transition)
@ -2853,6 +2939,7 @@ start_failed:
}
}
#if 0
static gboolean
gst_multi_handle_sink_unlock (GstBaseSink * bsink)
{

View file

@ -123,6 +123,8 @@ typedef enum
/* structure for a client
*/
typedef struct {
gchar debug[30]; /* a debug string used in debug calls to
identify the client */
gint bufpos; /* position of this client in the global queue */
gint flushcount; /* the remaining number of buffers to flush out or -1 if the
client is not flushing. */
@ -161,10 +163,20 @@ typedef struct {
guint64 last_buffer_ts;
} GstMultiHandleClient;
#define CLIENTS_LOCK_INIT(socketsink) (g_rec_mutex_init(&socketsink->clientslock))
#define CLIENTS_LOCK_CLEAR(socketsink) (g_rec_mutex_clear(&socketsink->clientslock))
#define CLIENTS_LOCK(socketsink) (g_rec_mutex_lock(&socketsink->clientslock))
#define CLIENTS_UNLOCK(socketsink) (g_rec_mutex_unlock(&socketsink->clientslock))
// FIXME: remove cast ?
#define CLIENTS_LOCK_INIT(mhsink) (g_rec_mutex_init(&(GST_MULTI_HANDLE_SINK(mhsink))->clientslock))
#define CLIENTS_LOCK_CLEAR(mhsink) (g_rec_mutex_clear(&(GST_MULTI_HANDLE_SINK(mhsink))->clientslock))
#define CLIENTS_LOCK(mhsink) (g_rec_mutex_lock(&(GST_MULTI_HANDLE_SINK(mhsink))->clientslock))
#define CLIENTS_UNLOCK(mhsink) (g_rec_mutex_unlock(&(GST_MULTI_HANDLE_SINK(mhsink))->clientslock))
// FIXME: internalize in .c file ?
gint
find_syncframe (GstMultiHandleSink * sink, gint idx, gint direction);
#define find_next_syncframe(s,i) find_syncframe(s,i,1)
#define find_prev_syncframe(s,i) find_syncframe(s,i,-1)
gboolean is_sync_frame (GstMultiHandleSink * sink, GstBuffer * buffer);
gboolean gst_multi_handle_sink_stop (GstBaseSink * bsink);
gboolean gst_multi_handle_sink_start (GstBaseSink * bsink);
/**
* GstMultiHandleSink:
@ -180,7 +192,6 @@ struct _GstMultiHandleSink {
GRecMutex clientslock; /* lock to protect the clients list */
GList *clients; /* list of clients we are serving */
GHashTable *socket_hash; /* index on socket to client */
guint clients_cookie; /* Cookie to detect changes to the clients list */
GMainContext *main_context;
@ -191,7 +202,6 @@ struct _GstMultiHandleSink {
guint mtu;
gint qos_dscp;
gboolean handle_read;
GArray *bufqueue; /* global queue of buffers */
@ -238,7 +248,14 @@ struct _GstMultiHandleSinkClass {
void (*remove) (GstMultiHandleSink *sink, GSocket *socket);
void (*remove_flush) (GstMultiHandleSink *sink, GSocket *socket);
void (*clear) (GstMultiHandleSink *sink);
void (*clear_post) (GstMultiHandleSink *sink);
void (*stop_pre) (GstMultiHandleSink *sink);
void (*stop_post) (GstMultiHandleSink *sink);
gboolean (*start_pre) (GstMultiHandleSink *sink);
gpointer (*thread) (GstMultiHandleSink *sink);
GstStructure* (*get_stats) (GstMultiHandleSink *sink, GSocket *socket);
void (*remove_client_link) (GstMultiHandleSink * sink, GList * link);
/* vtable */
gboolean (*init) (GstMultiHandleSink *sink);

2803
gst/tcp/gstmultioutputsink.c Normal file

File diff suppressed because it is too large Load diff

View file

@ -154,17 +154,11 @@ enum
#define DEFAULT_BURST_VALUE 0
#define DEFAULT_QOS_DSCP -1
#define DEFAULT_HANDLE_READ TRUE
#define DEFAULT_RESEND_STREAMHEADER TRUE
enum
{
PROP_0,
PROP_MODE,
PROP_BUFFERS_QUEUED,
PROP_BYTES_QUEUED,
PROP_TIME_QUEUED,
PROP_UNIT_TYPE,
PROP_UNITS_MAX,
@ -178,19 +172,19 @@ enum
PROP_QOS_DSCP,
PROP_HANDLE_READ,
PROP_RESEND_STREAMHEADER,
PROP_NUM_SOCKETS,
PROP_LAST
};
static void gst_multi_socket_sink_finalize (GObject * object);
static void gst_multi_socket_sink_remove_client_link (GstMultiSocketSink * sink,
static void gst_multi_socket_sink_stop_pre (GstMultiHandleSink * mhsink);
static void gst_multi_socket_sink_stop_post (GstMultiHandleSink * mhsink);
static gboolean gst_multi_socket_sink_start_pre (GstMultiHandleSink * mhsink);
static gpointer gst_multi_socket_sink_thread (GstMultiHandleSink * mhsink);
static void gst_multi_socket_sink_remove_client_link (GstMultiHandleSink * sink,
GList * link);
static gboolean gst_multi_socket_sink_socket_condition (GSocket * socket,
GIOCondition condition, GstMultiSocketSink * sink);
@ -199,8 +193,6 @@ static GstFlowReturn gst_multi_socket_sink_render (GstBaseSink * bsink,
GstBuffer * buf);
static gboolean gst_multi_socket_sink_unlock (GstBaseSink * bsink);
static gboolean gst_multi_socket_sink_unlock_stop (GstBaseSink * bsink);
static GstStateChangeReturn gst_multi_socket_sink_change_state (GstElement *
element, GstStateChange transition);
static void gst_multi_socket_sink_set_property (GObject * object, guint prop_id,
const GValue * value, GParamSpec * pspec);
@ -256,21 +248,6 @@ gst_multi_socket_sink_class_init (GstMultiSocketSinkClass * klass)
G_MAXINT64, DEFAULT_UNITS_SOFT_MAX,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_BUFFERS_QUEUED,
g_param_spec_uint ("buffers-queued", "Buffers queued",
"Number of buffers currently queued", 0, G_MAXUINT, 0,
G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
#if NOT_IMPLEMENTED
g_object_class_install_property (gobject_class, PROP_BYTES_QUEUED,
g_param_spec_uint ("bytes-queued", "Bytes queued",
"Number of bytes currently queued", 0, G_MAXUINT, 0,
G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_TIME_QUEUED,
g_param_spec_uint64 ("time-queued", "Time queued",
"Number of time currently queued", 0, G_MAXUINT64, 0,
G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
#endif
g_object_class_install_property (gobject_class, PROP_BURST_FORMAT,
g_param_spec_enum ("burst-format", "Burst format",
"The format of the burst units (when sync-method is burst[[-with]-keyframe])",
@ -287,30 +264,6 @@ gst_multi_socket_sink_class_init (GstMultiSocketSinkClass * klass)
-1, 63, DEFAULT_QOS_DSCP,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/**
* GstMultiSocketSink::handle-read
*
* Handle read requests from clients and discard the data.
*
* Since: 0.10.23
*/
g_object_class_install_property (gobject_class, PROP_HANDLE_READ,
g_param_spec_boolean ("handle-read", "Handle Read",
"Handle client reads and discard the data",
DEFAULT_HANDLE_READ, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/**
* GstMultiSocketSink::resend-streamheader
*
* Resend the streamheaders to existing clients when they change.
*
* Since: 0.10.23
*/
g_object_class_install_property (gobject_class, PROP_RESEND_STREAMHEADER,
g_param_spec_boolean ("resend-streamheader", "Resend streamheader",
"Resend the streamheader if it changes in the caps",
DEFAULT_RESEND_STREAMHEADER,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_NUM_SOCKETS,
g_param_spec_uint ("num-sockets", "Number of sockets",
"The current number of client sockets",
@ -460,16 +413,23 @@ gst_multi_socket_sink_class_init (GstMultiSocketSinkClass * klass)
"Wim Taymans <wim@fluendo.com>, "
"Sebastian Dröge <sebastian.droege@collabora.co.uk>");
gstelement_class->change_state =
GST_DEBUG_FUNCPTR (gst_multi_socket_sink_change_state);
gstbasesink_class->render = GST_DEBUG_FUNCPTR (gst_multi_socket_sink_render);
gstbasesink_class->unlock = GST_DEBUG_FUNCPTR (gst_multi_socket_sink_unlock);
gstbasesink_class->unlock_stop =
GST_DEBUG_FUNCPTR (gst_multi_socket_sink_unlock_stop);
gstmultihandlesink_class->clear =
GST_DEBUG_FUNCPTR (gst_multi_socket_sink_clear);
gstmultihandlesink_class->stop_pre =
GST_DEBUG_FUNCPTR (gst_multi_socket_sink_stop_pre);
gstmultihandlesink_class->stop_post =
GST_DEBUG_FUNCPTR (gst_multi_socket_sink_stop_post);
gstmultihandlesink_class->start_pre =
GST_DEBUG_FUNCPTR (gst_multi_socket_sink_start_pre);
gstmultihandlesink_class->thread =
GST_DEBUG_FUNCPTR (gst_multi_socket_sink_thread);
gstmultihandlesink_class->remove_client_link =
GST_DEBUG_FUNCPTR (gst_multi_socket_sink_remove_client_link);
klass->add = GST_DEBUG_FUNCPTR (gst_multi_socket_sink_add);
klass->add_full = GST_DEBUG_FUNCPTR (gst_multi_socket_sink_add_full);
@ -484,13 +444,8 @@ gst_multi_socket_sink_class_init (GstMultiSocketSinkClass * klass)
static void
gst_multi_socket_sink_init (GstMultiSocketSink * this)
{
GST_OBJECT_FLAG_UNSET (this, GST_MULTI_HANDLE_SINK_OPEN);
CLIENTS_LOCK_INIT (this);
this->clients = NULL;
this->socket_hash = g_hash_table_new (g_direct_hash, g_int_equal);
this->bufqueue = g_array_new (FALSE, TRUE, sizeof (GstBuffer *));
this->unit_type = DEFAULT_UNIT_TYPE;
this->units_max = DEFAULT_UNITS_MAX;
this->units_soft_max = DEFAULT_UNITS_SOFT_MAX;
@ -499,9 +454,6 @@ gst_multi_socket_sink_init (GstMultiSocketSink * this)
this->def_burst_value = DEFAULT_BURST_VALUE;
this->qos_dscp = DEFAULT_QOS_DSCP;
this->handle_read = DEFAULT_HANDLE_READ;
this->resend_streamheader = DEFAULT_RESEND_STREAMHEADER;
this->header_flags = 0;
this->cancellable = g_cancellable_new ();
@ -510,14 +462,9 @@ gst_multi_socket_sink_init (GstMultiSocketSink * this)
static void
gst_multi_socket_sink_finalize (GObject * object)
{
GstMultiSocketSink *this;
GstMultiSocketSink *this = GST_MULTI_SOCKET_SINK (object);
this = GST_MULTI_SOCKET_SINK (object);
CLIENTS_LOCK_CLEAR (this);
g_hash_table_destroy (this->socket_hash);
g_array_free (this->bufqueue, TRUE);
if (this->cancellable) {
g_object_unref (this->cancellable);
this->cancellable = NULL;
@ -595,9 +542,10 @@ static void
setup_dscp (GstMultiSocketSink * sink)
{
GList *clients;
GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
CLIENTS_LOCK (sink);
for (clients = sink->clients; clients; clients = clients->next) {
for (clients = mhsink->clients; clients; clients = clients->next) {
GstSocketClient *client;
client = clients->data;
@ -616,6 +564,7 @@ gst_multi_socket_sink_add_full (GstMultiSocketSink * sink, GSocket * socket,
GstSocketClient *client;
GstMultiHandleClient *mhclient;
GList *clink;
GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
GST_DEBUG_OBJECT (sink, "[socket %p] adding client, sync_method %d, "
"min_format %d, min_value %" G_GUINT64_FORMAT
@ -632,6 +581,7 @@ gst_multi_socket_sink_add_full (GstMultiSocketSink * sink, GSocket * socket,
client = g_new0 (GstSocketClient, 1);
mhclient = (GstMultiHandleClient *) client;
gst_multi_handle_sink_client_init (mhclient, sync_method);
g_snprintf (mhclient->debug, 30, "[socket %p]", socket);
client->socket = G_SOCKET (g_object_ref (socket));
client->burst_min_format = min_format;
@ -647,9 +597,9 @@ gst_multi_socket_sink_add_full (GstMultiSocketSink * sink, GSocket * socket,
goto duplicate;
/* we can add the fd now */
clink = sink->clients = g_list_prepend (sink->clients, client);
clink = mhsink->clients = g_list_prepend (mhsink->clients, client);
g_hash_table_insert (sink->socket_hash, socket, clink);
sink->clients_cookie++;
mhsink->clients_cookie++;
/* set the socket to non blocking */
g_socket_set_blocking (socket, FALSE);
@ -714,6 +664,9 @@ void
gst_multi_socket_sink_remove (GstMultiSocketSink * sink, GSocket * socket)
{
GList *clink;
GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
GstMultiHandleSinkClass *mhsinkclass =
GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
GST_DEBUG_OBJECT (sink, "[socket %p] removing client", socket);
@ -731,7 +684,7 @@ gst_multi_socket_sink_remove (GstMultiSocketSink * sink, GSocket * socket)
}
mhclient->status = GST_CLIENT_STATUS_REMOVED;
gst_multi_socket_sink_remove_client_link (sink, clink);
mhsinkclass->remove_client_link (GST_MULTI_HANDLE_SINK (sink), clink);
} else {
GST_WARNING_OBJECT (sink, "[socket %p] no client with this socket found!",
socket);
@ -777,40 +730,6 @@ done:
CLIENTS_UNLOCK (sink);
}
/* can be called both through the signal (i.e. from any thread) or when
* stopping, after the writing thread has shut down */
void
gst_multi_socket_sink_clear (GstMultiHandleSink * mhsink)
{
GList *clients, *next;
guint32 cookie;
GstMultiSocketSink *sink = GST_MULTI_SOCKET_SINK (mhsink);
GST_DEBUG_OBJECT (sink, "clearing all clients");
CLIENTS_LOCK (sink);
restart:
cookie = sink->clients_cookie;
for (clients = sink->clients; clients; clients = next) {
GstMultiHandleClient *mhclient;
if (cookie != sink->clients_cookie) {
GST_DEBUG_OBJECT (sink, "cookie changed while removing all clients");
goto restart;
}
mhclient = (GstMultiHandleClient *) clients->data;
next = g_list_next (clients);
mhclient->status = GST_CLIENT_STATUS_REMOVED;
/* the next call changes the list, which is why we iterate
* with a temporary next pointer */
gst_multi_socket_sink_remove_client_link (sink, clients);
}
CLIENTS_UNLOCK (sink);
}
/* "get-stats" signal implementation
*/
GstStructure *
@ -871,13 +790,14 @@ noclient:
* close the fd itself.
*/
static void
gst_multi_socket_sink_remove_client_link (GstMultiSocketSink * sink,
gst_multi_socket_sink_remove_client_link (GstMultiHandleSink * sink,
GList * link)
{
GSocket *socket;
GTimeVal now;
GstSocketClient *client = link->data;
GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
GstMultiSocketSink *mssink = GST_MULTI_SOCKET_SINK (sink);
GstMultiSocketSinkClass *fclass;
fclass = GST_MULTI_SOCKET_SINK_GET_CLASS (sink);
@ -885,8 +805,8 @@ gst_multi_socket_sink_remove_client_link (GstMultiSocketSink * sink,
socket = client->socket;
if (mhclient->currently_removing) {
GST_WARNING_OBJECT (sink, "[socket %p] client is already being removed",
socket);
GST_WARNING_OBJECT (sink, "%s client is already being removed",
mhclient->debug);
return;
} else {
mhclient->currently_removing = TRUE;
@ -895,31 +815,31 @@ gst_multi_socket_sink_remove_client_link (GstMultiSocketSink * sink,
/* FIXME: if we keep track of ip we can log it here and signal */
switch (mhclient->status) {
case GST_CLIENT_STATUS_OK:
GST_WARNING_OBJECT (sink, "[socket %p] removing client %p for no reason",
socket, client);
GST_WARNING_OBJECT (sink, "%s removing client %p for no reason",
mhclient->debug, client);
break;
case GST_CLIENT_STATUS_CLOSED:
GST_DEBUG_OBJECT (sink, "[socket %p] removing client %p because of close",
socket, client);
GST_DEBUG_OBJECT (sink, "%s removing client %p because of close",
mhclient->debug, client);
break;
case GST_CLIENT_STATUS_REMOVED:
GST_DEBUG_OBJECT (sink,
"[socket %p] removing client %p because the app removed it", socket,
"%s removing client %p because the app removed it", mhclient->debug,
client);
break;
case GST_CLIENT_STATUS_SLOW:
GST_INFO_OBJECT (sink,
"[socket %p] removing client %p because it was too slow", socket,
"%s removing client %p because it was too slow", mhclient->debug,
client);
break;
case GST_CLIENT_STATUS_ERROR:
GST_WARNING_OBJECT (sink,
"[socket %p] removing client %p because of error", socket, client);
"%s removing client %p because of error", mhclient->debug, client);
break;
case GST_CLIENT_STATUS_FLUSHING:
default:
GST_WARNING_OBJECT (sink,
"[socket %p] removing client %p with invalid reason %d", socket,
"%s removing client %p with invalid reason %d", mhclient->debug,
client, mhclient->status);
break;
}
@ -955,7 +875,7 @@ gst_multi_socket_sink_remove_client_link (GstMultiSocketSink * sink,
/* fd cannot be reused in the above signal callback so we can safely
* remove it from the hashtable here */
if (!g_hash_table_remove (sink->socket_hash, socket)) {
if (!g_hash_table_remove (mssink->socket_hash, socket)) {
GST_WARNING_OBJECT (sink,
"[socket %p] error removing client %p from hash", socket, client);
}
@ -968,7 +888,7 @@ gst_multi_socket_sink_remove_client_link (GstMultiSocketSink * sink,
sink->clients_cookie++;
if (fclass->removed)
fclass->removed (sink, socket);
fclass->removed (mssink, socket);
g_free (client);
CLIENTS_UNLOCK (sink);
@ -1036,24 +956,13 @@ gst_multi_socket_sink_handle_client_read (GstMultiSocketSink * sink,
return ret;
}
static gboolean
is_sync_frame (GstMultiSocketSink * sink, GstBuffer * buffer)
{
if (GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DELTA_UNIT)) {
return FALSE;
} else if (!GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_HEADER)) {
return TRUE;
}
return FALSE;
}
/* queue the given buffer for the given client */
static gboolean
gst_multi_socket_sink_client_queue_buffer (GstMultiSocketSink * sink,
GstSocketClient * client, GstBuffer * buffer)
{
GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
GstCaps *caps;
/* TRUE: send them if the new caps have them */
@ -1093,7 +1002,7 @@ gst_multi_socket_sink_client_queue_buffer (GstMultiSocketSink * sink,
send_streamheader = TRUE;
} else {
/* both old and new caps have streamheader set */
if (!sink->resend_streamheader) {
if (!mhsink->resend_streamheader) {
GST_DEBUG_OBJECT (sink,
"[socket %p] asked to not resend the streamheader, not sending",
client->socket);
@ -1168,40 +1077,6 @@ gst_multi_socket_sink_client_queue_buffer (GstMultiSocketSink * sink,
return TRUE;
}
/* find the keyframe in the list of buffers starting the
* search from @idx. @direction as -1 will search backwards,
* 1 will search forwards.
* Returns: the index or -1 if there is no keyframe after idx.
*/
static gint
find_syncframe (GstMultiSocketSink * sink, gint idx, gint direction)
{
gint i, len, result;
/* take length of queued buffers */
len = sink->bufqueue->len;
/* assume we don't find a keyframe */
result = -1;
/* then loop over all buffers to find the first keyframe */
for (i = idx; i >= 0 && i < len; i += direction) {
GstBuffer *buf;
buf = g_array_index (sink->bufqueue, GstBuffer *, i);
if (is_sync_frame (sink, buf)) {
GST_LOG_OBJECT (sink, "found keyframe at %d from %d, direction %d",
i, idx, direction);
result = i;
break;
}
}
return result;
}
#define find_next_syncframe(s,i) find_syncframe(s,i,1)
#define find_prev_syncframe(s,i) find_syncframe(s,i,-1)
/* Get the number of buffers from the buffer queue needed to satisfy
* the maximum max in the configured units.
* If units are not BUFFERS, and there are insufficient buffers in the
@ -1209,6 +1084,8 @@ find_syncframe (GstMultiSocketSink * sink, gint idx, gint direction)
static gint
get_buffers_max (GstMultiSocketSink * sink, gint64 max)
{
GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
switch (sink->unit_type) {
case GST_FORMAT_BUFFERS:
return max;
@ -1220,10 +1097,10 @@ get_buffers_max (GstMultiSocketSink * sink, gint64 max)
gint64 diff;
GstClockTime first = GST_CLOCK_TIME_NONE;
len = sink->bufqueue->len;
len = mhsink->bufqueue->len;
for (i = 0; i < len; i++) {
buf = g_array_index (sink->bufqueue, GstBuffer *, i);
buf = g_array_index (mhsink->bufqueue, GstBuffer *, i);
if (GST_BUFFER_TIMESTAMP_IS_VALID (buf)) {
if (first == -1)
first = GST_BUFFER_TIMESTAMP (buf);
@ -1243,10 +1120,10 @@ get_buffers_max (GstMultiSocketSink * sink, gint64 max)
int len;
gint acc = 0;
len = sink->bufqueue->len;
len = mhsink->bufqueue->len;
for (i = 0; i < len; i++) {
buf = g_array_index (sink->bufqueue, GstBuffer *, i);
buf = g_array_index (mhsink->bufqueue, GstBuffer *, i);
acc += gst_buffer_get_size (buf);
if (acc > max)
@ -1279,9 +1156,10 @@ find_limits (GstMultiSocketSink * sink,
GstClockTime first, time;
gint i, len, bytes;
gboolean result, max_hit;
GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
/* take length of queue */
len = sink->bufqueue->len;
len = mhsink->bufqueue->len;
/* this must hold */
g_assert (len > 0);
@ -1328,7 +1206,7 @@ find_limits (GstMultiSocketSink * sink,
result = *min_idx != -1;
break;
}
buf = g_array_index (sink->bufqueue, GstBuffer *, i);
buf = g_array_index (mhsink->bufqueue, GstBuffer *, i);
bytes += gst_buffer_get_size (buf);
@ -1441,12 +1319,13 @@ gst_multi_socket_sink_new_client (GstMultiSocketSink * sink,
{
gint result;
GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
GST_DEBUG_OBJECT (sink,
"[socket %p] new client, deciding where to start in queue",
client->socket);
GST_DEBUG_OBJECT (sink, "queue is currently %d buffers long",
sink->bufqueue->len);
mhsink->bufqueue->len);
switch (mhclient->sync_method) {
case GST_SYNC_METHOD_LATEST:
/* no syncing, we are happy with whatever the client is going to get */
@ -1463,7 +1342,7 @@ gst_multi_socket_sink_new_client (GstMultiSocketSink * sink,
"[socket %p] new client, bufpos %d, waiting for keyframe",
client->socket, mhclient->bufpos);
result = find_prev_syncframe (sink, mhclient->bufpos);
result = find_prev_syncframe (mhsink, mhclient->bufpos);
if (result != -1) {
GST_DEBUG_OBJECT (sink,
"[socket %p] SYNC_METHOD_NEXT_KEYFRAME: result %d",
@ -1489,7 +1368,7 @@ gst_multi_socket_sink_new_client (GstMultiSocketSink * sink,
* we need to wait for the next keyframe and so we change the client's
* sync method to GST_SYNC_METHOD_NEXT_KEYFRAME.
*/
result = find_next_syncframe (sink, 0);
result = find_next_syncframe (mhsink, 0);
if (result != -1) {
GST_DEBUG_OBJECT (sink,
"[socket %p] SYNC_METHOD_LATEST_KEYFRAME: result %d",
@ -1554,7 +1433,7 @@ gst_multi_socket_sink_new_client (GstMultiSocketSink * sink,
GST_LOG_OBJECT (sink, "min %d, max %d", min_idx, max_idx);
/* first find a keyframe after min_idx */
next_syncframe = find_next_syncframe (sink, min_idx);
next_syncframe = find_next_syncframe (mhsink, min_idx);
if (next_syncframe != -1 && next_syncframe < max_idx) {
/* we have a valid keyframe and it's below the max */
GST_LOG_OBJECT (sink, "found keyframe in min/max limits");
@ -1563,7 +1442,7 @@ gst_multi_socket_sink_new_client (GstMultiSocketSink * sink,
}
/* no valid keyframe, try to find one below min */
prev_syncframe = find_prev_syncframe (sink, min_idx);
prev_syncframe = find_prev_syncframe (mhsink, min_idx);
if (prev_syncframe != -1) {
GST_WARNING_OBJECT (sink,
"using keyframe below min in BURST_KEYFRAME sync mode");
@ -1601,7 +1480,7 @@ gst_multi_socket_sink_new_client (GstMultiSocketSink * sink,
GST_LOG_OBJECT (sink, "min %d, max %d", min_idx, max_idx);
/* first find a keyframe after min_idx */
next_syncframe = find_next_syncframe (sink, min_idx);
next_syncframe = find_next_syncframe (mhsink, min_idx);
if (next_syncframe != -1 && next_syncframe < max_idx) {
/* we have a valid keyframe and it's below the max */
GST_LOG_OBJECT (sink, "found keyframe in min/max limits");
@ -1720,7 +1599,7 @@ gst_multi_socket_sink_handle_client_write (GstMultiSocketSink * sink,
goto flushed;
/* grab buffer */
buf = g_array_index (sink->bufqueue, GstBuffer *, mhclient->bufpos);
buf = g_array_index (mhsink->bufqueue, GstBuffer *, mhclient->bufpos);
mhclient->bufpos--;
/* update stats */
@ -1856,14 +1735,14 @@ gst_multi_socket_sink_recover_client (GstMultiSocketSink * sink,
case GST_RECOVER_POLICY_RESYNC_KEYFRAME:
/* find keyframe in buffers, we search backwards to find the
* closest keyframe relative to what this client already received. */
newbufpos = MIN (sink->bufqueue->len - 1,
newbufpos = MIN (mhsink->bufqueue->len - 1,
get_buffers_max (sink, sink->units_soft_max) - 1);
while (newbufpos >= 0) {
GstBuffer *buf;
buf = g_array_index (sink->bufqueue, GstBuffer *, newbufpos);
if (is_sync_frame (sink, buf)) {
buf = g_array_index (mhsink->bufqueue, GstBuffer *, newbufpos);
if (is_sync_frame (mhsink, buf)) {
/* found a buffer that is not a delta unit */
break;
}
@ -1907,9 +1786,9 @@ gst_multi_socket_sink_queue_buffer (GstMultiSocketSink * sink, GstBuffer * buf)
GstClockTime now;
gint max_buffers, soft_max_buffers;
guint cookie;
GstMultiHandleSink *mhsink;
mhsink = GST_MULTI_HANDLE_SINK (sink);
GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
GstMultiHandleSinkClass *mhsinkclass =
GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
g_get_current_time (&nowtv);
now = GST_TIMEVAL_TO_TIME (nowtv);
@ -1917,8 +1796,8 @@ gst_multi_socket_sink_queue_buffer (GstMultiSocketSink * sink, GstBuffer * buf)
CLIENTS_LOCK (sink);
/* add buffer to queue */
gst_buffer_ref (buf);
g_array_prepend_val (sink->bufqueue, buf);
queuelen = sink->bufqueue->len;
g_array_prepend_val (mhsink->bufqueue, buf);
queuelen = mhsink->bufqueue->len;
if (sink->units_max > 0)
max_buffers = get_buffers_max (sink, sink->units_max);
@ -1936,12 +1815,12 @@ gst_multi_socket_sink_queue_buffer (GstMultiSocketSink * sink, GstBuffer * buf)
max_buffer_usage = 0;
restart:
cookie = sink->clients_cookie;
for (clients = sink->clients; clients; clients = next) {
cookie = mhsink->clients_cookie;
for (clients = mhsink->clients; clients; clients = next) {
GstSocketClient *client;
GstMultiHandleClient *mhclient;
if (cookie != sink->clients_cookie) {
if (cookie != mhsink->clients_cookie) {
GST_DEBUG_OBJECT (sink, "Clients cookie outdated, restarting");
goto restart;
}
@ -1982,7 +1861,7 @@ restart:
mhclient->status = GST_CLIENT_STATUS_SLOW;
/* set client to invalid position while being removed */
mhclient->bufpos = -1;
gst_multi_socket_sink_remove_client_link (sink, clients);
mhsinkclass->remove_client_link (mhsink, clients);
continue;
} else if (mhclient->bufpos == 0 || mhclient->new_connection) {
/* can send data to this client now. need to signal the select thread that
@ -2041,8 +1920,8 @@ restart:
"extending queue to include sync point, now at %d, limit is %d",
max_buffer_usage, limit);
for (i = 0; i < limit; i++) {
buf = g_array_index (sink->bufqueue, GstBuffer *, i);
if (is_sync_frame (sink, buf)) {
buf = g_array_index (mhsink->bufqueue, GstBuffer *, i);
if (is_sync_frame (mhsink, buf)) {
/* found a sync frame, now extend the buffer usage to
* include at least this frame. */
max_buffer_usage = MAX (max_buffer_usage, i);
@ -2062,14 +1941,14 @@ restart:
/* queue exceeded max size */
queuelen--;
old = g_array_index (sink->bufqueue, GstBuffer *, i);
sink->bufqueue = g_array_remove_index (sink->bufqueue, i);
old = g_array_index (mhsink->bufqueue, GstBuffer *, i);
mhsink->bufqueue = g_array_remove_index (mhsink->bufqueue, i);
/* unref tail buffer */
gst_buffer_unref (old);
}
/* save for stats */
sink->buffers_queued = max_buffer_usage;
mhsink->buffers_queued = max_buffer_usage;
CLIENTS_UNLOCK (sink);
}
@ -2085,6 +1964,9 @@ gst_multi_socket_sink_socket_condition (GSocket * socket,
GstSocketClient *client;
gboolean ret = TRUE;
GstMultiHandleClient *mhclient;
GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
GstMultiHandleSinkClass *mhsinkclass =
GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
CLIENTS_LOCK (sink);
clink = g_hash_table_lookup (sink->socket_hash, socket);
@ -2098,7 +1980,7 @@ gst_multi_socket_sink_socket_condition (GSocket * socket,
if (mhclient->status != GST_CLIENT_STATUS_FLUSHING
&& mhclient->status != GST_CLIENT_STATUS_OK) {
gst_multi_socket_sink_remove_client_link (sink, clink);
mhsinkclass->remove_client_link (mhsink, clink);
ret = FALSE;
goto done;
}
@ -2106,25 +1988,25 @@ gst_multi_socket_sink_socket_condition (GSocket * socket,
if ((condition & G_IO_ERR)) {
GST_WARNING_OBJECT (sink, "Socket %p has error", client->socket);
mhclient->status = GST_CLIENT_STATUS_ERROR;
gst_multi_socket_sink_remove_client_link (sink, clink);
mhsinkclass->remove_client_link (mhsink, clink);
ret = FALSE;
goto done;
} else if ((condition & G_IO_HUP)) {
mhclient->status = GST_CLIENT_STATUS_CLOSED;
gst_multi_socket_sink_remove_client_link (sink, clink);
mhsinkclass->remove_client_link (mhsink, clink);
ret = FALSE;
goto done;
} else if ((condition & G_IO_IN) || (condition & G_IO_PRI)) {
/* handle client read */
if (!gst_multi_socket_sink_handle_client_read (sink, client)) {
gst_multi_socket_sink_remove_client_link (sink, clink);
mhsinkclass->remove_client_link (mhsink, clink);
ret = FALSE;
goto done;
}
} else if ((condition & G_IO_OUT)) {
/* handle client write */
if (!gst_multi_socket_sink_handle_client_write (sink, client)) {
gst_multi_socket_sink_remove_client_link (sink, clink);
mhsinkclass->remove_client_link (mhsink, clink);
ret = FALSE;
goto done;
}
@ -2142,15 +2024,15 @@ gst_multi_socket_sink_timeout (GstMultiSocketSink * sink)
GstClockTime now;
GTimeVal nowtv;
GList *clients;
GstMultiHandleSink *mhsink;
mhsink = GST_MULTI_HANDLE_SINK (sink);
GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
GstMultiHandleSinkClass *mhsinkclass =
GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
g_get_current_time (&nowtv);
now = GST_TIMEVAL_TO_TIME (nowtv);
CLIENTS_LOCK (sink);
for (clients = sink->clients; clients; clients = clients->next) {
for (clients = mhsink->clients; clients; clients = clients->next) {
GstSocketClient *client;
GstMultiHandleClient *mhclient;
@ -2159,7 +2041,7 @@ gst_multi_socket_sink_timeout (GstMultiSocketSink * sink)
if (mhsink->timeout > 0
&& now - mhclient->last_activity_time > mhsink->timeout) {
mhclient->status = GST_CLIENT_STATUS_SLOW;
gst_multi_socket_sink_remove_client_link (sink, clients);
mhsinkclass->remove_client_link (mhsink, clients);
}
}
CLIENTS_UNLOCK (sink);
@ -2170,14 +2052,12 @@ gst_multi_socket_sink_timeout (GstMultiSocketSink * sink)
/* we handle the client communication in another thread so that we do not block
* the gstreamer thread while we select() on the client fds */
static gpointer
gst_multi_socket_sink_thread (GstMultiSocketSink * sink)
gst_multi_socket_sink_thread (GstMultiHandleSink * mhsink)
{
GstMultiSocketSink *sink = GST_MULTI_SOCKET_SINK (mhsink);
GSource *timeout = NULL;
GstMultiHandleSink *mhsink;
mhsink = GST_MULTI_HANDLE_SINK (sink);
while (sink->running) {
while (mhsink->running) {
if (mhsink->timeout > 0) {
timeout = g_timeout_source_new (mhsink->timeout / GST_MSECOND);
@ -2216,7 +2096,7 @@ gst_multi_socket_sink_render (GstBaseSink * bsink, GstBuffer * buf)
mhsink = GST_MULTI_HANDLE_SINK (sink);
g_return_val_if_fail (GST_OBJECT_FLAG_IS_SET (sink,
GST_MULTI_SOCKET_SINK_OPEN), GST_FLOW_FLUSHING);
GST_MULTI_HANDLE_SINK_OPEN), GST_FLOW_FLUSHING);
#if 0
/* since we check every buffer for streamheader caps, we need to make
@ -2272,9 +2152,9 @@ gst_multi_socket_sink_render (GstBaseSink * bsink, GstBuffer * buf)
if (in_caps && sink->previous_buffer_in_caps == FALSE) {
GST_DEBUG_OBJECT (sink,
"receiving new HEADER buffers, clearing old streamheader");
g_slist_foreach (sink->streamheader, (GFunc) gst_mini_object_unref, NULL);
g_slist_free (sink->streamheader);
sink->streamheader = NULL;
g_slist_foreach (mhsink->streamheader, (GFunc) gst_mini_object_unref, NULL);
g_slist_free (mhsink->streamheader);
mhsink->streamheader = NULL;
}
/* save the current in_caps */
@ -2291,7 +2171,7 @@ gst_multi_socket_sink_render (GstBaseSink * bsink, GstBuffer * buf)
if (in_caps) {
GST_DEBUG_OBJECT (sink, "appending HEADER buffer with length %"
G_GSIZE_FORMAT " to streamheader", gst_buffer_get_size (buf));
sink->streamheader = g_slist_append (sink->streamheader, buf);
mhsink->streamheader = g_slist_append (mhsink->streamheader, buf);
} else {
/* queue the buffer, this is a regular data buffer. */
gst_multi_socket_sink_queue_buffer (sink, buf);
@ -2345,12 +2225,6 @@ gst_multi_socket_sink_set_property (GObject * object, guint prop_id,
multisocketsink->qos_dscp = g_value_get_int (value);
setup_dscp (multisocketsink);
break;
case PROP_HANDLE_READ:
multisocketsink->handle_read = g_value_get_boolean (value);
break;
case PROP_RESEND_STREAMHEADER:
multisocketsink->resend_streamheader = g_value_get_boolean (value);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
@ -2373,15 +2247,6 @@ gst_multi_socket_sink_get_property (GObject * object, guint prop_id,
case PROP_BUFFERS_SOFT_MAX:
g_value_set_int (value, multisocketsink->units_soft_max);
break;
case PROP_BUFFERS_QUEUED:
g_value_set_uint (value, multisocketsink->buffers_queued);
break;
case PROP_BYTES_QUEUED:
g_value_set_uint (value, multisocketsink->bytes_queued);
break;
case PROP_TIME_QUEUED:
g_value_set_uint64 (value, multisocketsink->time_queued);
break;
case PROP_UNIT_TYPE:
g_value_set_enum (value, multisocketsink->unit_type);
break;
@ -2400,12 +2265,6 @@ gst_multi_socket_sink_get_property (GObject * object, guint prop_id,
case PROP_QOS_DSCP:
g_value_set_int (value, multisocketsink->qos_dscp);
break;
case PROP_HANDLE_READ:
g_value_set_boolean (value, multisocketsink->handle_read);
break;
case PROP_RESEND_STREAMHEADER:
g_value_set_boolean (value, multisocketsink->resend_streamheader);
break;
case PROP_NUM_SOCKETS:
g_value_set_uint (value,
g_hash_table_size (multisocketsink->socket_hash));
@ -2417,30 +2276,18 @@ gst_multi_socket_sink_get_property (GObject * object, guint prop_id,
}
}
/* create a socket for sending to remote machine */
static gboolean
gst_multi_socket_sink_start (GstBaseSink * bsink)
gst_multi_socket_sink_start_pre (GstMultiHandleSink * mhsink)
{
GstMultiSocketSinkClass *fclass;
GstMultiSocketSink *this;
GstMultiHandleSink *mhsink;
GstMultiSocketSink *mssink = GST_MULTI_SOCKET_SINK (mhsink);
GList *clients;
if (GST_OBJECT_FLAG_IS_SET (bsink, GST_MULTI_HANDLE_SINK_OPEN))
return TRUE;
GST_INFO_OBJECT (mssink, "starting");
this = GST_MULTI_SOCKET_SINK (bsink);
mhsink = GST_MULTI_HANDLE_SINK (bsink);
fclass = GST_MULTI_SOCKET_SINK_GET_CLASS (this);
mssink->main_context = g_main_context_new ();
GST_INFO_OBJECT (this, "starting");
this->main_context = g_main_context_new ();
CLIENTS_LOCK (this);
for (clients = this->clients; clients; clients = clients->next) {
CLIENTS_LOCK (mssink);
for (clients = mhsink->clients; clients; clients = clients->next) {
GstSocketClient *client;
client = clients->data;
@ -2448,152 +2295,46 @@ gst_multi_socket_sink_start (GstBaseSink * bsink)
continue;
client->source =
g_socket_create_source (client->socket,
G_IO_IN | G_IO_OUT | G_IO_PRI | G_IO_ERR | G_IO_HUP, this->cancellable);
G_IO_IN | G_IO_OUT | G_IO_PRI | G_IO_ERR | G_IO_HUP,
mssink->cancellable);
g_source_set_callback (client->source,
(GSourceFunc) gst_multi_socket_sink_socket_condition,
gst_object_ref (this), (GDestroyNotify) gst_object_unref);
g_source_attach (client->source, this->main_context);
gst_object_ref (mssink), (GDestroyNotify) gst_object_unref);
g_source_attach (client->source, mssink->main_context);
}
CLIENTS_UNLOCK (this);
this->streamheader = NULL;
mhsink->bytes_to_serve = 0;
mhsink->bytes_served = 0;
if (fclass->init) {
fclass->init (this);
}
this->running = TRUE;
this->thread = g_thread_new ("multisocketsink",
(GThreadFunc) gst_multi_socket_sink_thread, this);
GST_OBJECT_FLAG_SET (this, GST_MULTI_HANDLE_SINK_OPEN);
CLIENTS_UNLOCK (mssink);
return TRUE;
}
static gboolean
multisocketsink_hash_remove (gpointer key, gpointer value, gpointer data)
{
return TRUE;
}
static gboolean
gst_multi_socket_sink_stop (GstBaseSink * bsink)
static void
gst_multi_socket_sink_stop_pre (GstMultiHandleSink * mhsink)
{
GstMultiSocketSinkClass *fclass;
GstMultiHandleSinkClass *mhclass;
GstMultiSocketSink *this;
GstBuffer *buf;
gint i;
GstMultiSocketSink *mssink = GST_MULTI_SOCKET_SINK (mhsink);
this = GST_MULTI_SOCKET_SINK (bsink);
fclass = GST_MULTI_SOCKET_SINK_GET_CLASS (this);
mhclass = GST_MULTI_HANDLE_SINK_GET_CLASS (this);
if (!GST_OBJECT_FLAG_IS_SET (bsink, GST_MULTI_HANDLE_SINK_OPEN))
return TRUE;
this->running = FALSE;
if (this->main_context)
g_main_context_wakeup (this->main_context);
if (this->thread) {
GST_DEBUG_OBJECT (this, "joining thread");
g_thread_join (this->thread);
GST_DEBUG_OBJECT (this, "joined thread");
this->thread = NULL;
}
/* free the clients */
mhclass->clear (GST_MULTI_HANDLE_SINK (this));
if (this->streamheader) {
g_slist_foreach (this->streamheader, (GFunc) gst_mini_object_unref, NULL);
g_slist_free (this->streamheader);
this->streamheader = NULL;
}
if (fclass->close)
fclass->close (this);
if (this->main_context) {
g_main_context_unref (this->main_context);
this->main_context = NULL;
}
g_hash_table_foreach_remove (this->socket_hash, multisocketsink_hash_remove,
this);
/* remove all queued buffers */
if (this->bufqueue) {
GST_DEBUG_OBJECT (this, "Emptying bufqueue with %d buffers",
this->bufqueue->len);
for (i = this->bufqueue->len - 1; i >= 0; --i) {
buf = g_array_index (this->bufqueue, GstBuffer *, i);
GST_LOG_OBJECT (this, "Removing buffer %p (%d) with refcount %d", buf, i,
GST_MINI_OBJECT_REFCOUNT (buf));
gst_buffer_unref (buf);
this->bufqueue = g_array_remove_index (this->bufqueue, i);
}
/* freeing the array is done in _finalize */
}
GST_OBJECT_FLAG_UNSET (this, GST_MULTI_HANDLE_SINK_OPEN);
return TRUE;
if (mssink->main_context)
g_main_context_wakeup (mssink->main_context);
}
static GstStateChangeReturn
gst_multi_socket_sink_change_state (GstElement * element,
GstStateChange transition)
static void
gst_multi_socket_sink_stop_post (GstMultiHandleSink * mhsink)
{
GstMultiSocketSink *sink;
GstStateChangeReturn ret;
GstMultiSocketSink *mssink = GST_MULTI_SOCKET_SINK (mhsink);
sink = GST_MULTI_SOCKET_SINK (element);
/* we disallow changing the state from the streaming thread */
if (g_thread_self () == sink->thread)
return GST_STATE_CHANGE_FAILURE;
switch (transition) {
case GST_STATE_CHANGE_NULL_TO_READY:
if (!gst_multi_socket_sink_start (GST_BASE_SINK (sink)))
goto start_failed;
break;
case GST_STATE_CHANGE_READY_TO_PAUSED:
break;
case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
break;
default:
break;
if (mssink->main_context) {
g_main_context_unref (mssink->main_context);
mssink->main_context = NULL;
}
ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
switch (transition) {
case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
break;
case GST_STATE_CHANGE_PAUSED_TO_READY:
break;
case GST_STATE_CHANGE_READY_TO_NULL:
gst_multi_socket_sink_stop (GST_BASE_SINK (sink));
break;
default:
break;
}
return ret;
/* ERRORS */
start_failed:
{
/* error message was posted */
return GST_STATE_CHANGE_FAILURE;
}
g_hash_table_foreach_remove (mssink->socket_hash, multisocketsink_hash_remove,
mssink);
}
static gboolean

View file

@ -75,25 +75,15 @@ struct _GstMultiSocketSink {
GstMultiHandleSink element;
/*< private >*/
GRecMutex clientslock; /* lock to protect the clients list */
GList *clients; /* list of clients we are serving */
GHashTable *socket_hash; /* index on socket to client */
guint clients_cookie; /* Cookie to detect changes to the clients list */
GMainContext *main_context;
GCancellable *cancellable;
GSList *streamheader; /* GSList of GstBuffers to use as streamheader */
gboolean previous_buffer_in_caps;
guint mtu;
gint qos_dscp;
gboolean handle_read;
GArray *bufqueue; /* global queue of buffers */
gboolean running; /* the thread state */
GThread *thread; /* the sender thread */
/* these values are used to check if a client is reading fast
* enough and to control receovery */
@ -104,13 +94,6 @@ struct _GstMultiSocketSink {
GstFormat def_burst_format;
guint64 def_burst_value;
gboolean resend_streamheader; /* resend streamheader if it changes */
/* stats */
gint buffers_queued; /* number of queued buffers */
gint bytes_queued; /* number of queued bytes */
gint time_queued; /* number of queued time */
guint8 header_flags;
};
@ -128,8 +111,6 @@ struct _GstMultiSocketSinkClass {
GstStructure* (*get_stats) (GstMultiSocketSink *sink, GSocket *socket);
/* vtable */
gboolean (*init) (GstMultiSocketSink *sink);
gboolean (*close) (GstMultiSocketSink *sink);
void (*removed) (GstMultiSocketSink *sink, GSocket *socket);
/* signals */
@ -141,7 +122,7 @@ struct _GstMultiSocketSinkClass {
GType gst_multi_socket_sink_get_type (void);
void gst_multi_socket_sink_add (GstMultiSocketSink *sink, GSocket *socket);
void gst_multi_socket_sink_add_full (GstMultiSocketSink *sink, GSocket *socket, GstSyncMethod sync,
void gst_multi_socket_sink_add_full (GstMultiSocketSink *sink, GSocket *socket, GstSyncMethod sync,
GstFormat min_format, guint64 min_value,
GstFormat max_format, guint64 max_value);
void gst_multi_socket_sink_remove (GstMultiSocketSink *sink, GSocket *socket);

View file

@ -57,8 +57,8 @@ enum
static void gst_tcp_server_sink_finalize (GObject * gobject);
static gboolean gst_tcp_server_sink_init_send (GstMultiSocketSink * this);
static gboolean gst_tcp_server_sink_close (GstMultiSocketSink * this);
static gboolean gst_tcp_server_sink_init_send (GstMultiHandleSink * this);
static gboolean gst_tcp_server_sink_close (GstMultiHandleSink * this);
static void gst_tcp_server_sink_removed (GstMultiSocketSink * sink,
GSocket * socket);
@ -76,10 +76,12 @@ gst_tcp_server_sink_class_init (GstTCPServerSinkClass * klass)
{
GObjectClass *gobject_class;
GstElementClass *gstelement_class;
GstMultiHandleSinkClass *gstmultihandlesink_class;
GstMultiSocketSinkClass *gstmultifdsink_class;
gobject_class = (GObjectClass *) klass;
gstelement_class = (GstElementClass *) klass;
gstmultihandlesink_class = (GstMultiHandleSinkClass *) klass;
gstmultifdsink_class = (GstMultiSocketSinkClass *) klass;
gobject_class->set_property = gst_tcp_server_sink_set_property;
@ -99,8 +101,8 @@ gst_tcp_server_sink_class_init (GstTCPServerSinkClass * klass)
"Send data as a server over the network via TCP",
"Thomas Vander Stichele <thomas at apestaart dot org>");
gstmultifdsink_class->init = gst_tcp_server_sink_init_send;
gstmultifdsink_class->close = gst_tcp_server_sink_close;
gstmultihandlesink_class->init = gst_tcp_server_sink_init_send;
gstmultihandlesink_class->close = gst_tcp_server_sink_close;
gstmultifdsink_class->removed = gst_tcp_server_sink_removed;
GST_DEBUG_CATEGORY_INIT (tcpserversink_debug, "tcpserversink", 0, "TCP sink");
@ -263,7 +265,7 @@ gst_tcp_server_sink_get_property (GObject * object, guint prop_id,
/* create a socket for sending to remote machine */
static gboolean
gst_tcp_server_sink_init_send (GstMultiSocketSink * parent)
gst_tcp_server_sink_init_send (GstMultiHandleSink * parent)
{
GstTCPServerSink *this = GST_TCP_SERVER_SINK (parent);
GError *err = NULL;
@ -371,7 +373,7 @@ bind_failed:
}
g_clear_error (&err);
g_object_unref (saddr);
gst_tcp_server_sink_close (&this->element);
gst_tcp_server_sink_close (GST_MULTI_HANDLE_SINK (&this->element));
return FALSE;
}
listen_failed:
@ -384,13 +386,13 @@ listen_failed:
this->server_port, err->message));
}
g_clear_error (&err);
gst_tcp_server_sink_close (&this->element);
gst_tcp_server_sink_close (GST_MULTI_HANDLE_SINK (&this->element));
return FALSE;
}
}
static gboolean
gst_tcp_server_sink_close (GstMultiSocketSink * parent)
gst_tcp_server_sink_close (GstMultiHandleSink * parent)
{
GstTCPServerSink *this = GST_TCP_SERVER_SINK (parent);