diff --git a/gst/tcp/Makefile.am b/gst/tcp/Makefile.am index b94682c69d..7c20dc661b 100644 --- a/gst/tcp/Makefile.am +++ b/gst/tcp/Makefile.am @@ -17,6 +17,7 @@ libgsttcp_la_SOURCES = \ gsttcpplugin.c \ gsttcpclientsrc.c gsttcpclientsink.c \ gstmultifdsink.c \ + gstmultihandlesink.c \ gstmultisocketsink.c \ gsttcpserversrc.c gsttcpserversink.c diff --git a/gst/tcp/gstmultifdsink.c b/gst/tcp/gstmultifdsink.c index c53df680f3..1f4dd15c2a 100644 --- a/gst/tcp/gstmultifdsink.c +++ b/gst/tcp/gstmultifdsink.c @@ -141,7 +141,6 @@ enum SIGNAL_ADD_BURST, SIGNAL_REMOVE, SIGNAL_REMOVE_FLUSH, - SIGNAL_CLEAR, SIGNAL_GET_STATS, /* signals */ @@ -157,15 +156,9 @@ enum #define DEFAULT_MODE 1 #define DEFAULT_BUFFERS_MAX -1 #define DEFAULT_BUFFERS_SOFT_MAX -1 -#define DEFAULT_TIME_MIN -1 -#define DEFAULT_BYTES_MIN -1 -#define DEFAULT_BUFFERS_MIN -1 #define DEFAULT_UNIT_TYPE GST_TCP_UNIT_TYPE_BUFFERS #define DEFAULT_UNITS_MAX -1 #define DEFAULT_UNITS_SOFT_MAX -1 -#define DEFAULT_RECOVER_POLICY GST_RECOVER_POLICY_NONE -#define DEFAULT_TIMEOUT 0 -#define DEFAULT_SYNC_METHOD GST_SYNC_METHOD_LATEST #define DEFAULT_BURST_UNIT GST_TCP_UNIT_TYPE_UNDEFINED #define DEFAULT_BURST_VALUE 0 @@ -190,16 +183,6 @@ enum PROP_BUFFERS_MAX, PROP_BUFFERS_SOFT_MAX, - PROP_TIME_MIN, - PROP_BYTES_MIN, - PROP_BUFFERS_MIN, - - PROP_RECOVER_POLICY, - PROP_TIMEOUT, - PROP_SYNC_METHOD, - PROP_BYTES_TO_SERVE, - PROP_BYTES_SERVED, - PROP_BURST_UNIT, PROP_BURST_VALUE, @@ -234,59 +217,6 @@ gst_fdset_mode_get_type (void) return fdset_mode_type; } -#define GST_TYPE_RECOVER_POLICY (gst_recover_policy_get_type()) -static GType -gst_recover_policy_get_type (void) -{ - static GType recover_policy_type = 0; - static const GEnumValue recover_policy[] = { - {GST_RECOVER_POLICY_NONE, - "Do not try to recover", "none"}, - {GST_RECOVER_POLICY_RESYNC_LATEST, - "Resync client to latest buffer", "latest"}, - {GST_RECOVER_POLICY_RESYNC_SOFT_LIMIT, - "Resync client to soft limit", "soft-limit"}, - {GST_RECOVER_POLICY_RESYNC_KEYFRAME, - "Resync client to most recent keyframe", "keyframe"}, - {0, NULL, NULL}, - }; - - if (!recover_policy_type) { - recover_policy_type = - g_enum_register_static ("GstRecoverPolicy", recover_policy); - } - return recover_policy_type; -} - -#define GST_TYPE_SYNC_METHOD (gst_sync_method_get_type()) -static GType -gst_sync_method_get_type (void) -{ - static GType sync_method_type = 0; - static const GEnumValue sync_method[] = { - {GST_SYNC_METHOD_LATEST, - "Serve starting from the latest buffer", "latest"}, - {GST_SYNC_METHOD_NEXT_KEYFRAME, - "Serve starting from the next keyframe", "next-keyframe"}, - {GST_SYNC_METHOD_LATEST_KEYFRAME, - "Serve everything since the latest keyframe (burst)", - "latest-keyframe"}, - {GST_SYNC_METHOD_BURST, "Serve burst-value data to client", "burst"}, - {GST_SYNC_METHOD_BURST_KEYFRAME, - "Serve burst-value data starting on a keyframe", - "burst-keyframe"}, - {GST_SYNC_METHOD_BURST_WITH_KEYFRAME, - "Serve burst-value data preferably starting on a keyframe", - "burst-with-keyframe"}, - {0, NULL, NULL}, - }; - - if (!sync_method_type) { - sync_method_type = g_enum_register_static ("GstSyncMethod", sync_method); - } - return sync_method_type; -} - #define GST_TYPE_UNIT_TYPE (gst_unit_type_get_type()) static GType gst_unit_type_get_type (void) @@ -306,29 +236,6 @@ gst_unit_type_get_type (void) return unit_type_type; } -#define GST_TYPE_CLIENT_STATUS (gst_client_status_get_type()) -static GType -gst_client_status_get_type (void) -{ - static GType client_status_type = 0; - static const GEnumValue client_status[] = { - {GST_CLIENT_STATUS_OK, "ok", "ok"}, - {GST_CLIENT_STATUS_CLOSED, "Closed", "closed"}, - {GST_CLIENT_STATUS_REMOVED, "Removed", "removed"}, - {GST_CLIENT_STATUS_SLOW, "Too slow", "slow"}, - {GST_CLIENT_STATUS_ERROR, "Error", "error"}, - {GST_CLIENT_STATUS_DUPLICATE, "Duplicate", "duplicate"}, - {GST_CLIENT_STATUS_FLUSHING, "Flushing", "flushing"}, - {0, NULL, NULL}, - }; - - if (!client_status_type) { - client_status_type = - g_enum_register_static ("GstClientStatus", client_status); - } - return client_status_type; -} - static void gst_multi_fd_sink_finalize (GObject * object); static void gst_multi_fd_sink_remove_client_link (GstMultiFdSink * sink, @@ -345,7 +252,7 @@ static void gst_multi_fd_sink_get_property (GObject * object, guint prop_id, GValue * value, GParamSpec * pspec); #define gst_multi_fd_sink_parent_class parent_class -G_DEFINE_TYPE (GstMultiFdSink, gst_multi_fd_sink, GST_TYPE_BASE_SINK); +G_DEFINE_TYPE (GstMultiFdSink, gst_multi_fd_sink, GST_TYPE_MULTI_HANDLE_SINK); static guint gst_multi_fd_sink_signals[LAST_SIGNAL] = { 0 }; @@ -355,10 +262,12 @@ gst_multi_fd_sink_class_init (GstMultiFdSinkClass * klass) GObjectClass *gobject_class; GstElementClass *gstelement_class; GstBaseSinkClass *gstbasesink_class; + GstMultiHandleSinkClass *gstmultihandlesink_class; gobject_class = (GObjectClass *) klass; gstelement_class = (GstElementClass *) klass; gstbasesink_class = (GstBaseSinkClass *) klass; + gstmultihandlesink_class = (GstMultiHandleSinkClass *) klass; gobject_class->set_property = gst_multi_fd_sink_set_property; gobject_class->get_property = gst_multi_fd_sink_get_property; @@ -389,22 +298,6 @@ gst_multi_fd_sink_class_init (GstMultiFdSinkClass * klass) G_MAXINT, DEFAULT_BUFFERS_SOFT_MAX, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - g_object_class_install_property (gobject_class, PROP_BYTES_MIN, - g_param_spec_int ("bytes-min", "Bytes min", - "min number of bytes to queue (-1 = as little as possible)", -1, - G_MAXINT, DEFAULT_BYTES_MIN, - G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - g_object_class_install_property (gobject_class, PROP_TIME_MIN, - g_param_spec_int64 ("time-min", "Time min", - "min number of time to queue (-1 = as little as possible)", -1, - G_MAXINT64, DEFAULT_TIME_MIN, - G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - g_object_class_install_property (gobject_class, PROP_BUFFERS_MIN, - g_param_spec_int ("buffers-min", "Buffers min", - "min number of buffers to queue (-1 = as few as possible)", -1, - G_MAXINT, DEFAULT_BUFFERS_MIN, - G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - g_object_class_install_property (gobject_class, PROP_UNIT_TYPE, g_param_spec_enum ("unit-type", "Units type", "The unit to measure the max/soft-max/queued properties", @@ -435,29 +328,6 @@ gst_multi_fd_sink_class_init (GstMultiFdSinkClass * klass) G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); #endif - g_object_class_install_property (gobject_class, PROP_RECOVER_POLICY, - g_param_spec_enum ("recover-policy", "Recover Policy", - "How to recover when client reaches the soft max", - GST_TYPE_RECOVER_POLICY, DEFAULT_RECOVER_POLICY, - G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - g_object_class_install_property (gobject_class, PROP_TIMEOUT, - g_param_spec_uint64 ("timeout", "Timeout", - "Maximum inactivity timeout in nanoseconds for a client (0 = no limit)", - 0, G_MAXUINT64, DEFAULT_TIMEOUT, - G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - g_object_class_install_property (gobject_class, PROP_SYNC_METHOD, - g_param_spec_enum ("sync-method", "Sync Method", - "How to sync new clients to the stream", GST_TYPE_SYNC_METHOD, - DEFAULT_SYNC_METHOD, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - g_object_class_install_property (gobject_class, PROP_BYTES_TO_SERVE, - g_param_spec_uint64 ("bytes-to-serve", "Bytes to serve", - "Number of bytes received to serve to clients", 0, G_MAXUINT64, 0, - G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); - g_object_class_install_property (gobject_class, PROP_BYTES_SERVED, - g_param_spec_uint64 ("bytes-served", "Bytes served", - "Total number of bytes send to all clients", 0, G_MAXUINT64, 0, - G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); - g_object_class_install_property (gobject_class, PROP_BURST_UNIT, g_param_spec_enum ("burst-unit", "Burst unit", "The format of the burst units (when sync-method is burst[[-with]-keyframe])", @@ -561,18 +431,6 @@ gst_multi_fd_sink_class_init (GstMultiFdSinkClass * klass) G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstMultiFdSinkClass, remove_flush), NULL, NULL, gst_tcp_marshal_VOID__INT, G_TYPE_NONE, 1, G_TYPE_INT); - /** - * GstMultiFdSink::clear: - * @gstmultifdsink: the multifdsink element to emit this signal on - * - * Remove all file descriptors from multifdsink. Since multifdsink did not - * open fd's itself, it does not explicitly close the fd. The application - * should do so by connecting to the client-fd-removed callback. - */ - gst_multi_fd_sink_signals[SIGNAL_CLEAR] = - g_signal_new ("clear", G_TYPE_FROM_CLASS (klass), - G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstMultiFdSinkClass, - clear), NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0); /** * GstMultiFdSink::get-stats: @@ -663,11 +521,12 @@ gst_multi_fd_sink_class_init (GstMultiFdSinkClass * klass) gstbasesink_class->render = GST_DEBUG_FUNCPTR (gst_multi_fd_sink_render); + gstmultihandlesink_class->clear = GST_DEBUG_FUNCPTR (gst_multi_fd_sink_clear); + klass->add = GST_DEBUG_FUNCPTR (gst_multi_fd_sink_add); klass->add_full = GST_DEBUG_FUNCPTR (gst_multi_fd_sink_add_full); klass->remove = GST_DEBUG_FUNCPTR (gst_multi_fd_sink_remove); klass->remove_flush = GST_DEBUG_FUNCPTR (gst_multi_fd_sink_remove_flush); - klass->clear = GST_DEBUG_FUNCPTR (gst_multi_fd_sink_clear); klass->get_stats = GST_DEBUG_FUNCPTR (gst_multi_fd_sink_get_stats); GST_DEBUG_CATEGORY_INIT (multifdsink_debug, "multifdsink", 0, "FD sink"); @@ -676,7 +535,7 @@ gst_multi_fd_sink_class_init (GstMultiFdSinkClass * klass) static void gst_multi_fd_sink_init (GstMultiFdSink * this) { - GST_OBJECT_FLAG_UNSET (this, GST_MULTI_FD_SINK_OPEN); + GST_OBJECT_FLAG_UNSET (this, GST_MULTI_HANDLE_SINK_OPEN); this->mode = DEFAULT_MODE; @@ -688,13 +547,7 @@ gst_multi_fd_sink_init (GstMultiFdSink * this) this->unit_type = DEFAULT_UNIT_TYPE; this->units_max = DEFAULT_UNITS_MAX; this->units_soft_max = DEFAULT_UNITS_SOFT_MAX; - this->time_min = DEFAULT_TIME_MIN; - this->bytes_min = DEFAULT_BYTES_MIN; - this->buffers_min = DEFAULT_BUFFERS_MIN; - this->recover_policy = DEFAULT_RECOVER_POLICY; - this->timeout = DEFAULT_TIMEOUT; - this->def_sync_method = DEFAULT_SYNC_METHOD; this->def_burst_unit = DEFAULT_BURST_UNIT; this->def_burst_value = DEFAULT_BURST_VALUE; @@ -805,8 +658,8 @@ gst_multi_fd_sink_add_full (GstMultiFdSink * sink, int fd, GstTCPUnitType max_unit, guint64 max_value) { GstTCPClient *client; + GstMultiHandleClient *mhclient; GList *clink; - GTimeVal now; gint flags; struct stat statbuf; @@ -823,31 +676,14 @@ gst_multi_fd_sink_add_full (GstMultiFdSink * sink, int fd, /* create client datastructure */ client = g_new0 (GstTCPClient, 1); + mhclient = (GstMultiHandleClient *) client; + gst_multi_handle_sink_client_init (mhclient, sync_method); + client->fd.fd = fd; - client->status = GST_CLIENT_STATUS_OK; - client->bufpos = -1; - client->flushcount = -1; - client->bufoffset = 0; - client->sending = NULL; - client->bytes_sent = 0; - client->dropped_buffers = 0; - client->avg_queue_size = 0; - client->first_buffer_ts = GST_CLOCK_TIME_NONE; - client->last_buffer_ts = GST_CLOCK_TIME_NONE; - client->new_connection = TRUE; client->burst_min_unit = min_unit; client->burst_min_value = min_value; client->burst_max_unit = max_unit; client->burst_max_value = max_value; - client->sync_method = sync_method; - client->currently_removing = FALSE; - - /* update start time */ - g_get_current_time (&now); - client->connect_time = GST_TIMEVAL_TO_TIME (now); - client->disconnect_time = 0; - /* set last activity time to connect time */ - client->last_activity_time = client->connect_time; CLIENTS_LOCK (sink); @@ -903,12 +739,12 @@ wrong_limits: } duplicate: { - client->status = GST_CLIENT_STATUS_DUPLICATE; + mhclient->status = GST_CLIENT_STATUS_DUPLICATE; CLIENTS_UNLOCK (sink); GST_WARNING_OBJECT (sink, "[fd %5d] duplicate client found, refusing", fd); g_signal_emit (G_OBJECT (sink), gst_multi_fd_sink_signals[SIGNAL_CLIENT_REMOVED], 0, fd, - client->status); + mhclient->status); g_free (client); return; } @@ -918,7 +754,10 @@ duplicate: void gst_multi_fd_sink_add (GstMultiFdSink * sink, int fd) { - gst_multi_fd_sink_add_full (sink, fd, sink->def_sync_method, + GstMultiHandleSink *mhsink; + + mhsink = GST_MULTI_HANDLE_SINK (sink); + gst_multi_fd_sink_add_full (sink, fd, mhsink->def_sync_method, sink->def_burst_unit, sink->def_burst_value, sink->def_burst_unit, -1); } @@ -934,15 +773,16 @@ gst_multi_fd_sink_remove (GstMultiFdSink * sink, int fd) clink = g_hash_table_lookup (sink->fd_hash, &fd); if (clink != NULL) { GstTCPClient *client = (GstTCPClient *) clink->data; + GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client; - if (client->status != GST_CLIENT_STATUS_OK) { + if (mhclient->status != GST_CLIENT_STATUS_OK) { GST_INFO_OBJECT (sink, "[fd %5d] Client already disconnecting with status %d", - fd, client->status); + fd, mhclient->status); goto done; } - client->status = GST_CLIENT_STATUS_REMOVED; + mhclient->status = GST_CLIENT_STATUS_REMOVED; gst_multi_fd_sink_remove_client_link (sink, clink); gst_poll_restart (sink->fdset); } else { @@ -965,21 +805,22 @@ gst_multi_fd_sink_remove_flush (GstMultiFdSink * sink, int fd) clink = g_hash_table_lookup (sink->fd_hash, &fd); if (clink != NULL) { GstTCPClient *client = (GstTCPClient *) clink->data; + GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client; - if (client->status != GST_CLIENT_STATUS_OK) { + if (mhclient->status != GST_CLIENT_STATUS_OK) { GST_INFO_OBJECT (sink, "[fd %5d] Client already disconnecting with status %d", - fd, client->status); + fd, mhclient->status); goto done; } /* take the position of the client as the number of buffers left to flush. * If the client was at position -1, we flush 0 buffers, 0 == flush 1 * buffer, etc... */ - client->flushcount = client->bufpos + 1; + mhclient->flushcount = mhclient->bufpos + 1; /* mark client as flushing. We can not remove the client right away because * it might have some buffers to flush in the ->sending queue. */ - client->status = GST_CLIENT_STATUS_FLUSHING; + mhclient->status = GST_CLIENT_STATUS_FLUSHING; } else { GST_WARNING_OBJECT (sink, "[fd %5d] no client with this fd found!", fd); } @@ -990,10 +831,12 @@ done: /* can be called both through the signal (i.e. from any thread) or when * stopping, after the writing thread has shut down */ void -gst_multi_fd_sink_clear (GstMultiFdSink * sink) +gst_multi_fd_sink_clear (GstMultiHandleSink * mhsink) { GList *clients, *next; guint32 cookie; + GstMultiFdSink *sink = GST_MULTI_FD_SINK (mhsink); + GST_DEBUG_OBJECT (sink, "clearing all clients"); @@ -1001,17 +844,17 @@ gst_multi_fd_sink_clear (GstMultiFdSink * sink) restart: cookie = sink->clients_cookie; for (clients = sink->clients; clients; clients = next) { - GstTCPClient *client; + GstMultiHandleClient *mhclient; if (cookie != sink->clients_cookie) { GST_DEBUG_OBJECT (sink, "cookie changed while removing all clients"); goto restart; } - client = (GstTCPClient *) clients->data; + mhclient = (GstMultiHandleClient *) clients->data; next = g_list_next (clients); - client->status = GST_CLIENT_STATUS_REMOVED; + mhclient->status = GST_CLIENT_STATUS_REMOVED; gst_multi_fd_sink_remove_client_link (sink, clients); } gst_poll_restart (sink->fdset); @@ -1044,30 +887,31 @@ gst_multi_fd_sink_get_stats (GstMultiFdSink * sink, int fd) client = (GstTCPClient *) clink->data; if (client != NULL) { + GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client; GValue value = { 0 }; guint64 interval; result = g_value_array_new (7); g_value_init (&value, G_TYPE_UINT64); - g_value_set_uint64 (&value, client->bytes_sent); + g_value_set_uint64 (&value, mhclient->bytes_sent); result = g_value_array_append (result, &value); g_value_unset (&value); g_value_init (&value, G_TYPE_UINT64); - g_value_set_uint64 (&value, client->connect_time); + g_value_set_uint64 (&value, mhclient->connect_time); result = g_value_array_append (result, &value); g_value_unset (&value); - if (client->disconnect_time == 0) { + if (mhclient->disconnect_time == 0) { GTimeVal nowtv; g_get_current_time (&nowtv); - interval = GST_TIMEVAL_TO_TIME (nowtv) - client->connect_time; + interval = GST_TIMEVAL_TO_TIME (nowtv) - mhclient->connect_time; } else { - interval = client->disconnect_time - client->connect_time; + interval = mhclient->disconnect_time - mhclient->connect_time; } g_value_init (&value, G_TYPE_UINT64); - g_value_set_uint64 (&value, client->disconnect_time); + g_value_set_uint64 (&value, mhclient->disconnect_time); result = g_value_array_append (result, &value); g_value_unset (&value); g_value_init (&value, G_TYPE_UINT64); @@ -1075,19 +919,19 @@ gst_multi_fd_sink_get_stats (GstMultiFdSink * sink, int fd) result = g_value_array_append (result, &value); g_value_unset (&value); g_value_init (&value, G_TYPE_UINT64); - g_value_set_uint64 (&value, client->last_activity_time); + g_value_set_uint64 (&value, mhclient->last_activity_time); result = g_value_array_append (result, &value); g_value_unset (&value); g_value_init (&value, G_TYPE_UINT64); - g_value_set_uint64 (&value, client->dropped_buffers); + g_value_set_uint64 (&value, mhclient->dropped_buffers); result = g_value_array_append (result, &value); g_value_unset (&value); g_value_init (&value, G_TYPE_UINT64); - g_value_set_uint64 (&value, client->first_buffer_ts); + g_value_set_uint64 (&value, mhclient->first_buffer_ts); result = g_value_array_append (result, &value); g_value_unset (&value); g_value_init (&value, G_TYPE_UINT64); - g_value_set_uint64 (&value, client->last_buffer_ts); + g_value_set_uint64 (&value, mhclient->last_buffer_ts); result = g_value_array_append (result, &value); } @@ -1114,21 +958,22 @@ gst_multi_fd_sink_remove_client_link (GstMultiFdSink * sink, GList * link) int fd; GTimeVal now; GstTCPClient *client = (GstTCPClient *) link->data; + GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client; GstMultiFdSinkClass *fclass; fclass = GST_MULTI_FD_SINK_GET_CLASS (sink); fd = client->fd.fd; - if (client->currently_removing) { + if (mhclient->currently_removing) { GST_WARNING_OBJECT (sink, "[fd %5d] client is already being removed", fd); return; } else { - client->currently_removing = TRUE; + mhclient->currently_removing = TRUE; } /* FIXME: if we keep track of ip we can log it here and signal */ - switch (client->status) { + switch (mhclient->status) { case GST_CLIENT_STATUS_OK: GST_WARNING_OBJECT (sink, "[fd %5d] removing client %p for no reason", fd, client); @@ -1153,30 +998,31 @@ gst_multi_fd_sink_remove_client_link (GstMultiFdSink * sink, GList * link) default: GST_WARNING_OBJECT (sink, "[fd %5d] removing client %p with invalid reason %d", fd, client, - client->status); + mhclient->status); break; } gst_poll_remove_fd (sink->fdset, &client->fd); g_get_current_time (&now); - client->disconnect_time = GST_TIMEVAL_TO_TIME (now); + mhclient->disconnect_time = GST_TIMEVAL_TO_TIME (now); /* free client buffers */ - g_slist_foreach (client->sending, (GFunc) gst_mini_object_unref, NULL); - g_slist_free (client->sending); - client->sending = NULL; + g_slist_foreach (mhclient->sending, (GFunc) gst_mini_object_unref, NULL); + g_slist_free (mhclient->sending); + mhclient->sending = NULL; - if (client->caps) - gst_caps_unref (client->caps); - client->caps = NULL; + if (mhclient->caps) + gst_caps_unref (mhclient->caps); + mhclient->caps = NULL; /* unlock the mutex before signaling because the signal handler * might query some properties */ CLIENTS_UNLOCK (sink); g_signal_emit (G_OBJECT (sink), - gst_multi_fd_sink_signals[SIGNAL_CLIENT_REMOVED], 0, fd, client->status); + gst_multi_fd_sink_signals[SIGNAL_CLIENT_REMOVED], 0, fd, + mhclient->status); /* lock again before we remove the client completely */ CLIENTS_LOCK (sink); @@ -1217,6 +1063,7 @@ gst_multi_fd_sink_handle_client_read (GstMultiFdSink * sink, { int avail, fd; gboolean ret; + GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client; fd = client->fd.fd; @@ -1231,11 +1078,11 @@ gst_multi_fd_sink_handle_client_read (GstMultiFdSink * sink, if (avail == 0) { /* client sent close, so remove it */ GST_DEBUG_OBJECT (sink, "[fd %5d] client asked for close, removing", fd); - client->status = GST_CLIENT_STATUS_CLOSED; + mhclient->status = GST_CLIENT_STATUS_CLOSED; ret = FALSE; } else if (avail < 0) { GST_WARNING_OBJECT (sink, "[fd %5d] avail < 0, removing", fd); - client->status = GST_CLIENT_STATUS_ERROR; + mhclient->status = GST_CLIENT_STATUS_ERROR; ret = FALSE; } else { guint8 dummy[512]; @@ -1255,12 +1102,12 @@ gst_multi_fd_sink_handle_client_read (GstMultiFdSink * sink, if (nread < -1) { GST_WARNING_OBJECT (sink, "[fd %5d] could not read %d bytes: %s (%d)", fd, to_read, g_strerror (errno), errno); - client->status = GST_CLIENT_STATUS_ERROR; + mhclient->status = GST_CLIENT_STATUS_ERROR; ret = FALSE; break; } else if (nread == 0) { GST_WARNING_OBJECT (sink, "[fd %5d] 0 bytes in read, removing", fd); - client->status = GST_CLIENT_STATUS_ERROR; + mhclient->status = GST_CLIENT_STATUS_ERROR; ret = FALSE; break; } @@ -1275,7 +1122,7 @@ ioctl_failed: { GST_WARNING_OBJECT (sink, "[fd %5d] ioctl failed: %s (%d)", fd, g_strerror (errno), errno); - client->status = GST_CLIENT_STATUS_ERROR; + mhclient->status = GST_CLIENT_STATUS_ERROR; return FALSE; } } @@ -1302,20 +1149,21 @@ gst_multi_fd_sink_client_queue_buffer (GstMultiFdSink * sink, /* TRUE: send them if the new caps have them */ gboolean send_streamheader = FALSE; GstStructure *s; + GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client; /* before we queue the buffer, we check if we need to queue streamheader * buffers (because it's a new client, or because they changed) */ caps = gst_pad_get_current_caps (GST_BASE_SINK_PAD (sink)); - if (!client->caps) { + if (!mhclient->caps) { GST_DEBUG_OBJECT (sink, "[fd %5d] no previous caps for this client, send streamheader", client->fd.fd); send_streamheader = TRUE; - client->caps = gst_caps_ref (caps); + mhclient->caps = gst_caps_ref (caps); } else { /* there were previous caps recorded, so compare */ - if (!gst_caps_is_equal (caps, client->caps)) { + if (!gst_caps_is_equal (caps, mhclient->caps)) { const GValue *sh1, *sh2; /* caps are not equal, but could still have the same streamheader */ @@ -1327,7 +1175,7 @@ gst_multi_fd_sink_client_queue_buffer (GstMultiFdSink * sink, client->fd.fd); } else { /* there is a new streamheader */ - s = gst_caps_get_structure (client->caps, 0); + s = gst_caps_get_structure (mhclient->caps, 0); if (!gst_structure_has_field (s, "streamheader")) { /* no previous streamheader, so send the new one */ GST_DEBUG_OBJECT (sink, @@ -1356,8 +1204,8 @@ gst_multi_fd_sink_client_queue_buffer (GstMultiFdSink * sink, } } /* Replace the old caps */ - gst_caps_unref (client->caps); - client->caps = gst_caps_ref (caps); + gst_caps_unref (mhclient->caps); + mhclient->caps = gst_caps_ref (caps); } if (G_UNLIKELY (send_streamheader)) { @@ -1392,7 +1240,7 @@ gst_multi_fd_sink_client_queue_buffer (GstMultiFdSink * sink, client->fd.fd, gst_buffer_get_size (buffer)); gst_buffer_ref (buffer); - client->sending = g_slist_append (client->sending, buffer); + mhclient->sending = g_slist_append (mhclient->sending, buffer); } } } @@ -1404,7 +1252,7 @@ gst_multi_fd_sink_client_queue_buffer (GstMultiFdSink * sink, client->fd.fd, gst_buffer_get_size (buffer)); gst_buffer_ref (buffer); - client->sending = g_slist_append (client->sending, buffer); + mhclient->sending = g_slist_append (mhclient->sending, buffer); return TRUE; } @@ -1680,27 +1528,28 @@ static gint gst_multi_fd_sink_new_client (GstMultiFdSink * sink, GstTCPClient * client) { gint result; + GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client; GST_DEBUG_OBJECT (sink, "[fd %5d] new client, deciding where to start in queue", client->fd.fd); GST_DEBUG_OBJECT (sink, "queue is currently %d buffers long", sink->bufqueue->len); - switch (client->sync_method) { + switch (mhclient->sync_method) { case GST_SYNC_METHOD_LATEST: /* no syncing, we are happy with whatever the client is going to get */ - result = client->bufpos; + result = mhclient->bufpos; GST_DEBUG_OBJECT (sink, "[fd %5d] SYNC_METHOD_LATEST, position %d", client->fd.fd, result); break; case GST_SYNC_METHOD_NEXT_KEYFRAME: { - /* if one of the new buffers (between client->bufpos and 0) in the queue + /* if one of the new buffers (between mhclient->bufpos and 0) in the queue * is a sync point, we can proceed, otherwise we need to keep waiting */ GST_LOG_OBJECT (sink, "[fd %5d] new client, bufpos %d, waiting for keyframe", client->fd.fd, - client->bufpos); + mhclient->bufpos); - result = find_prev_syncframe (sink, client->bufpos); + result = find_prev_syncframe (sink, mhclient->bufpos); if (result != -1) { GST_DEBUG_OBJECT (sink, "[fd %5d] SYNC_METHOD_NEXT_KEYFRAME: result %d", @@ -1713,7 +1562,7 @@ gst_multi_fd_sink_new_client (GstMultiFdSink * sink, GstTCPClient * client) GST_LOG_OBJECT (sink, "[fd %5d] new client, skipping buffer(s), no syncpoint found", client->fd.fd); - client->bufpos = -1; + mhclient->bufpos = -1; break; } case GST_SYNC_METHOD_LATEST_KEYFRAME: @@ -1738,9 +1587,9 @@ gst_multi_fd_sink_new_client (GstMultiFdSink * sink, GstTCPClient * client) "[fd %5d] SYNC_METHOD_LATEST_KEYFRAME: no keyframe found, " "switching to SYNC_METHOD_NEXT_KEYFRAME", client->fd.fd); /* throw client to the waiting state */ - client->bufpos = -1; + mhclient->bufpos = -1; /* and make client sync to next keyframe */ - client->sync_method = GST_SYNC_METHOD_NEXT_KEYFRAME; + mhclient->sync_method = GST_SYNC_METHOD_NEXT_KEYFRAME; break; } case GST_SYNC_METHOD_BURST: @@ -1813,9 +1662,9 @@ gst_multi_fd_sink_new_client (GstMultiFdSink * sink, GstTCPClient * client) "no prev keyframe found in BURST_KEYFRAME sync mode, waiting for next"); /* throw client to the waiting state */ - client->bufpos = -1; + mhclient->bufpos = -1; /* and make client sync to next keyframe */ - client->sync_method = GST_SYNC_METHOD_NEXT_KEYFRAME; + mhclient->sync_method = GST_SYNC_METHOD_NEXT_KEYFRAME; result = -1; break; } @@ -1859,8 +1708,8 @@ gst_multi_fd_sink_new_client (GstMultiFdSink * sink, GstTCPClient * client) break; } default: - g_warning ("unknown sync method %d", client->sync_method); - result = client->bufpos; + g_warning ("unknown sync method %d", mhclient->sync_method); + result = mhclient->bufpos; break; } return result; @@ -1875,13 +1724,13 @@ gst_multi_fd_sink_new_client (GstMultiFdSink * sink, GstTCPClient * client) * We first check to see if we need to send streamheaders. If so, we queue them. * * Then we run into the main loop that tries to send as many buffers as - * possible. It will first exhaust the client->sending queue and if the queue + * possible. It will first exhaust the mhclient->sending queue and if the queue * is empty, it will pick a buffer from the global queue. * - * Sending the buffers from the client->sending queue is basically writing + * Sending the buffers from the mhclient->sending queue is basically writing * the bytes to the socket and maintaining a count of the bytes that were * sent. When the buffer is completely sent, it is removed from the - * client->sending queue and we try to pick a new buffer for sending. + * mhclient->sending queue and we try to pick a new buffer for sending. * * When the sending returns a partial buffer we stop sending more data as * the next send operation could block. @@ -1897,24 +1746,28 @@ gst_multi_fd_sink_handle_client_write (GstMultiFdSink * sink, gboolean flushing; GstClockTime now; GTimeVal nowtv; + GstMultiHandleSink *mhsink; + GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client; + + mhsink = GST_MULTI_HANDLE_SINK (sink); g_get_current_time (&nowtv); now = GST_TIMEVAL_TO_TIME (nowtv); - flushing = client->status == GST_CLIENT_STATUS_FLUSHING; + flushing = mhclient->status == GST_CLIENT_STATUS_FLUSHING; more = TRUE; do { gint maxsize; - if (!client->sending) { + if (!mhclient->sending) { /* client is not working on a buffer */ - if (client->bufpos == -1) { + if (mhclient->bufpos == -1) { /* client is too fast, remove from write queue until new buffer is * available */ gst_poll_fd_ctl_write (sink->fdset, &client->fd, FALSE); /* if we flushed out all of the client buffers, we can stop */ - if (client->flushcount == 0) + if (mhclient->flushcount == 0) goto flushed; return TRUE; @@ -1925,13 +1778,13 @@ gst_multi_fd_sink_handle_client_write (GstMultiFdSink * sink, /* for new connections, we need to find a good spot in the * bufqueue to start streaming from */ - if (client->new_connection && !flushing) { + if (mhclient->new_connection && !flushing) { gint position = gst_multi_fd_sink_new_client (sink, client); if (position >= 0) { /* we got a valid spot in the queue */ - client->new_connection = FALSE; - client->bufpos = position; + mhclient->new_connection = FALSE; + mhclient->bufpos = position; } else { /* cannot send data to this client yet */ gst_poll_fd_ctl_write (sink->fdset, &client->fd, FALSE); @@ -1940,48 +1793,48 @@ gst_multi_fd_sink_handle_client_write (GstMultiFdSink * sink, } /* we flushed all remaining buffers, no need to get a new one */ - if (client->flushcount == 0) + if (mhclient->flushcount == 0) goto flushed; /* grab buffer */ - buf = g_array_index (sink->bufqueue, GstBuffer *, client->bufpos); - client->bufpos--; + buf = g_array_index (sink->bufqueue, GstBuffer *, mhclient->bufpos); + mhclient->bufpos--; /* update stats */ timestamp = GST_BUFFER_TIMESTAMP (buf); - if (client->first_buffer_ts == GST_CLOCK_TIME_NONE) - client->first_buffer_ts = timestamp; + if (mhclient->first_buffer_ts == GST_CLOCK_TIME_NONE) + mhclient->first_buffer_ts = timestamp; if (timestamp != -1) - client->last_buffer_ts = timestamp; + mhclient->last_buffer_ts = timestamp; /* decrease flushcount */ - if (client->flushcount != -1) - client->flushcount--; + if (mhclient->flushcount != -1) + mhclient->flushcount--; GST_LOG_OBJECT (sink, "[fd %5d] client %p at position %d", - fd, client, client->bufpos); + fd, client, mhclient->bufpos); /* queueing a buffer will ref it */ gst_multi_fd_sink_client_queue_buffer (sink, client, buf); /* need to start from the first byte for this new buffer */ - client->bufoffset = 0; + mhclient->bufoffset = 0; } } /* see if we need to send something */ - if (client->sending) { + if (mhclient->sending) { ssize_t wrote; GstBuffer *head; GstMapInfo info; guint8 *data; /* pick first buffer from list */ - head = GST_BUFFER (client->sending->data); + head = GST_BUFFER (mhclient->sending->data); g_assert (gst_buffer_map (head, &info, GST_MAP_READ)); data = info.data; - maxsize = info.size - client->bufoffset; + maxsize = info.size - mhclient->bufoffset; /* try to write the complete buffer */ #ifdef MSG_NOSIGNAL @@ -1990,9 +1843,9 @@ gst_multi_fd_sink_handle_client_write (GstMultiFdSink * sink, #define FLAGS 0 #endif if (client->is_socket) { - wrote = send (fd, data + client->bufoffset, maxsize, FLAGS); + wrote = send (fd, data + mhclient->bufoffset, maxsize, FLAGS); } else { - wrote = write (fd, data + client->bufoffset, maxsize); + wrote = write (fd, data + mhclient->bufoffset, maxsize); } gst_buffer_unmap (head, &info); @@ -2012,19 +1865,19 @@ gst_multi_fd_sink_handle_client_write (GstMultiFdSink * sink, * stop sending more */ GST_LOG_OBJECT (sink, "partial write on %d of %" G_GSSIZE_FORMAT " bytes", fd, wrote); - client->bufoffset += wrote; + mhclient->bufoffset += wrote; more = FALSE; } else { /* complete buffer was written, we can proceed to the next one */ - client->sending = g_slist_remove (client->sending, head); + mhclient->sending = g_slist_remove (mhclient->sending, head); gst_buffer_unref (head); /* make sure we start from byte 0 for the next buffer */ - client->bufoffset = 0; + mhclient->bufoffset = 0; } /* update stats */ - client->bytes_sent += wrote; - client->last_activity_time = now; - sink->bytes_served += wrote; + mhclient->bytes_sent += wrote; + mhclient->last_activity_time = now; + mhsink->bytes_served += wrote; } } } while (more); @@ -2035,13 +1888,13 @@ gst_multi_fd_sink_handle_client_write (GstMultiFdSink * sink, flushed: { GST_DEBUG_OBJECT (sink, "[fd %5d] flushed, removing", fd); - client->status = GST_CLIENT_STATUS_REMOVED; + mhclient->status = GST_CLIENT_STATUS_REMOVED; return FALSE; } connection_reset: { GST_DEBUG_OBJECT (sink, "[fd %5d] connection reset by peer, removing", fd); - client->status = GST_CLIENT_STATUS_CLOSED; + mhclient->status = GST_CLIENT_STATUS_CLOSED; return FALSE; } write_error: @@ -2049,7 +1902,7 @@ write_error: GST_WARNING_OBJECT (sink, "[fd %5d] could not write, removing client: %s (%d)", fd, g_strerror (errno), errno); - client->status = GST_CLIENT_STATUS_ERROR; + mhclient->status = GST_CLIENT_STATUS_ERROR; return FALSE; } } @@ -2062,16 +1915,18 @@ static gint gst_multi_fd_sink_recover_client (GstMultiFdSink * sink, GstTCPClient * client) { gint newbufpos; + GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink); + GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client; GST_WARNING_OBJECT (sink, "[fd %5d] client %p is lagging at %d, recover using policy %d", - client->fd.fd, client, client->bufpos, sink->recover_policy); + client->fd.fd, client, mhclient->bufpos, mhsink->recover_policy); - switch (sink->recover_policy) { + switch (mhsink->recover_policy) { case GST_RECOVER_POLICY_NONE: /* do nothing, client will catch up or get kicked out when it reaches * the hard max */ - newbufpos = client->bufpos; + newbufpos = mhclient->bufpos; break; case GST_RECOVER_POLICY_RESYNC_LATEST: /* move to beginning of queue */ @@ -2112,7 +1967,7 @@ gst_multi_fd_sink_recover_client (GstMultiFdSink * sink, GstTCPClient * client) * tail buffer if the max queue size is exceeded, unreffing the queued buffer. * Note that unreffing the buffer is not a problem as clients who * started writing out this buffer will still have a reference to it in the - * client->sending queue. + * mhclient->sending queue. * * After adding the buffer, we update all client positions in the queue. If * a client moves over the soft max, we start the recovery procedure for this @@ -2136,6 +1991,9 @@ gst_multi_fd_sink_queue_buffer (GstMultiFdSink * sink, GstBuffer * buf) GstClockTime now; gint max_buffers, soft_max_buffers; guint cookie; + GstMultiHandleSink *mhsink; + + mhsink = GST_MULTI_HANDLE_SINK (sink); g_get_current_time (&nowtv); now = GST_TIMEVAL_TO_TIME (nowtv); @@ -2164,6 +2022,7 @@ restart: cookie = sink->clients_cookie; for (clients = sink->clients; clients; clients = next) { GstTCPClient *client; + GstMultiHandleClient *mhclient; if (cookie != sink->clients_cookie) { GST_DEBUG_OBJECT (sink, "Clients cookie outdated, restarting"); @@ -2171,22 +2030,23 @@ restart: } client = (GstTCPClient *) clients->data; + mhclient = (GstMultiHandleClient *) client; next = g_list_next (clients); - client->bufpos++; + mhclient->bufpos++; GST_LOG_OBJECT (sink, "[fd %5d] client %p at position %d", - client->fd.fd, client, client->bufpos); + client->fd.fd, client, mhclient->bufpos); /* check soft max if needed, recover client */ - if (soft_max_buffers > 0 && client->bufpos >= soft_max_buffers) { + if (soft_max_buffers > 0 && mhclient->bufpos >= soft_max_buffers) { gint newpos; newpos = gst_multi_fd_sink_recover_client (sink, client); - if (newpos != client->bufpos) { - client->dropped_buffers += client->bufpos - newpos; - client->bufpos = newpos; - client->discont = TRUE; + if (newpos != mhclient->bufpos) { + mhclient->dropped_buffers += mhclient->bufpos - newpos; + mhclient->bufpos = newpos; + mhclient->discont = TRUE; GST_INFO_OBJECT (sink, "[fd %5d] client %p position reset to %d", - client->fd.fd, client, client->bufpos); + client->fd.fd, client, mhclient->bufpos); } else { GST_INFO_OBJECT (sink, "[fd %5d] client %p not recovering position", @@ -2194,29 +2054,29 @@ restart: } } /* check hard max and timeout, remove client */ - if ((max_buffers > 0 && client->bufpos >= max_buffers) || - (sink->timeout > 0 - && now - client->last_activity_time > sink->timeout)) { + if ((max_buffers > 0 && mhclient->bufpos >= max_buffers) || + (mhsink->timeout > 0 + && now - mhclient->last_activity_time > mhsink->timeout)) { /* remove client */ GST_WARNING_OBJECT (sink, "[fd %5d] client %p is too slow, removing", client->fd.fd, client); /* remove the client, the fd set will be cleared and the select thread * will be signaled */ - client->status = GST_CLIENT_STATUS_SLOW; + mhclient->status = GST_CLIENT_STATUS_SLOW; /* set client to invalid position while being removed */ - client->bufpos = -1; + mhclient->bufpos = -1; gst_multi_fd_sink_remove_client_link (sink, clients); need_signal = TRUE; continue; - } else if (client->bufpos == 0 || client->new_connection) { + } else if (mhclient->bufpos == 0 || mhclient->new_connection) { /* can send data to this client now. need to signal the select thread that * the fd_set changed */ gst_poll_fd_ctl_write (sink->fdset, &client->fd, TRUE); need_signal = TRUE; } /* keep track of maximum buffer usage */ - if (client->bufpos > max_buffer_usage) { - max_buffer_usage = client->bufpos; + if (mhclient->bufpos > max_buffer_usage) { + max_buffer_usage = mhclient->bufpos; } } @@ -2227,13 +2087,14 @@ restart: GST_LOG_OBJECT (sink, "extending queue %d to respect time_min %" GST_TIME_FORMAT ", bytes_min %d, buffers_min %d", max_buffer_usage, - GST_TIME_ARGS (sink->time_min), sink->bytes_min, sink->buffers_min); + GST_TIME_ARGS (mhsink->time_min), mhsink->bytes_min, + mhsink->buffers_min); /* get index where the limits are ok, we don't really care if all limits * are ok, we just queue as much as we need. We also don't compare against * the max limits. */ - find_limits (sink, &usage, sink->bytes_min, sink->buffers_min, - sink->time_min, &max, -1, -1, -1); + find_limits (sink, &usage, mhsink->bytes_min, mhsink->buffers_min, + mhsink->time_min, &max, -1, -1, -1); max_buffer_usage = MAX (max_buffer_usage, usage + 1); GST_LOG_OBJECT (sink, "extended queue to %d", max_buffer_usage); @@ -2242,8 +2103,8 @@ restart: /* now look for sync points and make sure there is at least one * sync point in the queue. We only do this if the LATEST_KEYFRAME or * BURST_KEYFRAME mode is selected */ - if (sink->def_sync_method == GST_SYNC_METHOD_LATEST_KEYFRAME || - sink->def_sync_method == GST_SYNC_METHOD_BURST_KEYFRAME) { + if (mhsink->def_sync_method == GST_SYNC_METHOD_LATEST_KEYFRAME || + mhsink->def_sync_method == GST_SYNC_METHOD_BURST_KEYFRAME) { /* no point in searching beyond the queue length */ gint limit = queuelen; GstBuffer *buf; @@ -2309,6 +2170,9 @@ gst_multi_fd_sink_handle_clients (GstMultiFdSink * sink) gboolean try_again; GstMultiFdSinkClass *fclass; guint cookie; + GstMultiHandleSink *mhsink; + + mhsink = GST_MULTI_HANDLE_SINK (sink); fclass = GST_MULTI_FD_SINK_GET_CLASS (sink); @@ -2321,8 +2185,9 @@ gst_multi_fd_sink_handle_clients (GstMultiFdSink * sink) * - client socket output (ie, client reads) */ GST_LOG_OBJECT (sink, "waiting on action on fdset"); - result = gst_poll_wait (sink->fdset, sink->timeout != 0 ? sink->timeout : - GST_CLOCK_TIME_NONE); + result = + gst_poll_wait (sink->fdset, + mhsink->timeout != 0 ? mhsink->timeout : GST_CLOCK_TIME_NONE); /* Handle the special case in which the sink is not receiving more buffers * and will not disconnect inactive client in the streaming thread. */ @@ -2336,12 +2201,14 @@ gst_multi_fd_sink_handle_clients (GstMultiFdSink * sink) CLIENTS_LOCK (sink); for (clients = sink->clients; clients; clients = next) { GstTCPClient *client; + GstMultiHandleClient *mhclient; client = (GstTCPClient *) clients->data; + mhclient = (GstMultiHandleClient *) client; next = g_list_next (clients); - if (sink->timeout > 0 - && now - client->last_activity_time > sink->timeout) { - client->status = GST_CLIENT_STATUS_SLOW; + if (mhsink->timeout > 0 + && now - mhclient->last_activity_time > mhsink->timeout) { + mhclient->status = GST_CLIENT_STATUS_SLOW; gst_multi_fd_sink_remove_client_link (sink, clients); } } @@ -2358,6 +2225,7 @@ gst_multi_fd_sink_handle_clients (GstMultiFdSink * sink) cookie = sink->clients_cookie; for (clients = sink->clients; clients; clients = next) { GstTCPClient *client; + GstMultiHandleClient *mhclient; int fd; long flags; int res; @@ -2368,6 +2236,7 @@ gst_multi_fd_sink_handle_clients (GstMultiFdSink * sink) } client = (GstTCPClient *) clients->data; + mhclient = (GstMultiHandleClient *) client; next = g_list_next (clients); fd = client->fd.fd; @@ -2377,7 +2246,7 @@ gst_multi_fd_sink_handle_clients (GstMultiFdSink * sink) GST_WARNING_OBJECT (sink, "fnctl failed for %d, removing: %s (%d)", fd, g_strerror (errno), errno); if (errno == EBADF) { - client->status = GST_CLIENT_STATUS_ERROR; + mhclient->status = GST_CLIENT_STATUS_ERROR; /* releases the CLIENTS lock */ gst_multi_fd_sink_remove_client_link (sink, clients); } @@ -2415,6 +2284,7 @@ restart2: cookie = sink->clients_cookie; for (clients = sink->clients; clients; clients = next) { GstTCPClient *client; + GstMultiHandleClient *mhclient; if (sink->clients_cookie != cookie) { GST_DEBUG_OBJECT (sink, "Restarting loop, cookie out of date"); @@ -2422,22 +2292,23 @@ restart2: } client = (GstTCPClient *) clients->data; + mhclient = (GstMultiHandleClient *) client; next = g_list_next (clients); - if (client->status != GST_CLIENT_STATUS_FLUSHING - && client->status != GST_CLIENT_STATUS_OK) { + if (mhclient->status != GST_CLIENT_STATUS_FLUSHING + && mhclient->status != GST_CLIENT_STATUS_OK) { gst_multi_fd_sink_remove_client_link (sink, clients); continue; } if (gst_poll_fd_has_closed (sink->fdset, &client->fd)) { - client->status = GST_CLIENT_STATUS_CLOSED; + mhclient->status = GST_CLIENT_STATUS_CLOSED; gst_multi_fd_sink_remove_client_link (sink, clients); continue; } if (gst_poll_fd_has_error (sink->fdset, &client->fd)) { GST_WARNING_OBJECT (sink, "gst_poll_fd_has_error for %d", client->fd.fd); - client->status = GST_CLIENT_STATUS_ERROR; + mhclient->status = GST_CLIENT_STATUS_ERROR; gst_multi_fd_sink_remove_client_link (sink, clients); continue; } @@ -2478,11 +2349,13 @@ gst_multi_fd_sink_render (GstBaseSink * bsink, GstBuffer * buf) #if 0 GstCaps *bufcaps, *padcaps; #endif + GstMultiHandleSink *mhsink; sink = GST_MULTI_FD_SINK (bsink); + mhsink = GST_MULTI_HANDLE_SINK (sink); - g_return_val_if_fail (GST_OBJECT_FLAG_IS_SET (sink, GST_MULTI_FD_SINK_OPEN), - GST_FLOW_WRONG_STATE); + g_return_val_if_fail (GST_OBJECT_FLAG_IS_SET (sink, + GST_MULTI_HANDLE_SINK_OPEN), GST_FLOW_FLUSHING); #if 0 /* since we check every buffer for streamheader caps, we need to make @@ -2563,7 +2436,7 @@ gst_multi_fd_sink_render (GstBaseSink * bsink, GstBuffer * buf) /* queue the buffer, this is a regular data buffer. */ gst_multi_fd_sink_queue_buffer (sink, buf); - sink->bytes_to_serve += gst_buffer_get_size (buf); + mhsink->bytes_to_serve += gst_buffer_get_size (buf); } return GST_FLOW_OK; @@ -2596,15 +2469,6 @@ gst_multi_fd_sink_set_property (GObject * object, guint prop_id, case PROP_BUFFERS_SOFT_MAX: multifdsink->units_soft_max = g_value_get_int (value); break; - case PROP_TIME_MIN: - multifdsink->time_min = g_value_get_int64 (value); - break; - case PROP_BYTES_MIN: - multifdsink->bytes_min = g_value_get_int (value); - break; - case PROP_BUFFERS_MIN: - multifdsink->buffers_min = g_value_get_int (value); - break; case PROP_UNIT_TYPE: multifdsink->unit_type = g_value_get_enum (value); break; @@ -2614,15 +2478,6 @@ gst_multi_fd_sink_set_property (GObject * object, guint prop_id, case PROP_UNITS_SOFT_MAX: multifdsink->units_soft_max = g_value_get_int64 (value); break; - case PROP_RECOVER_POLICY: - multifdsink->recover_policy = g_value_get_enum (value); - break; - case PROP_TIMEOUT: - multifdsink->timeout = g_value_get_uint64 (value); - break; - case PROP_SYNC_METHOD: - multifdsink->def_sync_method = g_value_get_enum (value); - break; case PROP_BURST_UNIT: multifdsink->def_burst_unit = g_value_get_enum (value); break; @@ -2664,15 +2519,6 @@ gst_multi_fd_sink_get_property (GObject * object, guint prop_id, GValue * value, case PROP_BUFFERS_SOFT_MAX: g_value_set_int (value, multifdsink->units_soft_max); break; - case PROP_TIME_MIN: - g_value_set_int64 (value, multifdsink->time_min); - break; - case PROP_BYTES_MIN: - g_value_set_int (value, multifdsink->bytes_min); - break; - case PROP_BUFFERS_MIN: - g_value_set_int (value, multifdsink->buffers_min); - break; case PROP_BUFFERS_QUEUED: g_value_set_uint (value, multifdsink->buffers_queued); break; @@ -2691,21 +2537,6 @@ gst_multi_fd_sink_get_property (GObject * object, guint prop_id, GValue * value, case PROP_UNITS_SOFT_MAX: g_value_set_int64 (value, multifdsink->units_soft_max); break; - case PROP_RECOVER_POLICY: - g_value_set_enum (value, multifdsink->recover_policy); - break; - case PROP_TIMEOUT: - g_value_set_uint64 (value, multifdsink->timeout); - break; - case PROP_SYNC_METHOD: - g_value_set_enum (value, multifdsink->def_sync_method); - break; - case PROP_BYTES_TO_SERVE: - g_value_set_uint64 (value, multifdsink->bytes_to_serve); - break; - case PROP_BYTES_SERVED: - g_value_set_uint64 (value, multifdsink->bytes_served); - break; case PROP_BURST_UNIT: g_value_set_enum (value, multifdsink->def_burst_unit); break; @@ -2738,11 +2569,13 @@ gst_multi_fd_sink_start (GstBaseSink * bsink) { GstMultiFdSinkClass *fclass; GstMultiFdSink *this; + GstMultiHandleSink *mhsink; - if (GST_OBJECT_FLAG_IS_SET (bsink, GST_MULTI_FD_SINK_OPEN)) + if (GST_OBJECT_FLAG_IS_SET (bsink, GST_MULTI_HANDLE_SINK_OPEN)) return TRUE; this = GST_MULTI_FD_SINK (bsink); + mhsink = GST_MULTI_HANDLE_SINK (bsink); fclass = GST_MULTI_FD_SINK_GET_CLASS (this); GST_INFO_OBJECT (this, "starting in mode %d", this->mode); @@ -2750,8 +2583,8 @@ gst_multi_fd_sink_start (GstBaseSink * bsink) goto socket_pair; this->streamheader = NULL; - this->bytes_to_serve = 0; - this->bytes_served = 0; + mhsink->bytes_to_serve = 0; + mhsink->bytes_served = 0; if (fclass->init) { fclass->init (this); @@ -2767,7 +2600,7 @@ gst_multi_fd_sink_start (GstBaseSink * bsink) (GThreadFunc) gst_multi_fd_sink_thread, this); #endif - GST_OBJECT_FLAG_SET (this, GST_MULTI_FD_SINK_OPEN); + GST_OBJECT_FLAG_SET (this, GST_MULTI_HANDLE_SINK_OPEN); return TRUE; @@ -2790,14 +2623,16 @@ static gboolean gst_multi_fd_sink_stop (GstBaseSink * bsink) { GstMultiFdSinkClass *fclass; + GstMultiHandleSinkClass *mhclass; GstMultiFdSink *this; GstBuffer *buf; int i; this = GST_MULTI_FD_SINK (bsink); fclass = GST_MULTI_FD_SINK_GET_CLASS (this); + mhclass = GST_MULTI_HANDLE_SINK_GET_CLASS (this); - if (!GST_OBJECT_FLAG_IS_SET (bsink, GST_MULTI_FD_SINK_OPEN)) + if (!GST_OBJECT_FLAG_IS_SET (bsink, GST_MULTI_HANDLE_SINK_OPEN)) return TRUE; this->running = FALSE; @@ -2811,7 +2646,7 @@ gst_multi_fd_sink_stop (GstBaseSink * bsink) } /* free the clients */ - gst_multi_fd_sink_clear (this); + mhclass->clear (GST_MULTI_HANDLE_SINK (this)); if (this->streamheader) { g_slist_foreach (this->streamheader, (GFunc) gst_mini_object_unref, NULL); @@ -2841,7 +2676,7 @@ gst_multi_fd_sink_stop (GstBaseSink * bsink) } /* freeing the array is done in _finalize */ } - GST_OBJECT_FLAG_UNSET (this, GST_MULTI_FD_SINK_OPEN); + GST_OBJECT_FLAG_UNSET (this, GST_MULTI_HANDLE_SINK_OPEN); return TRUE; } diff --git a/gst/tcp/gstmultifdsink.h b/gst/tcp/gstmultifdsink.h index 8cc541b1c2..a224e90d44 100644 --- a/gst/tcp/gstmultifdsink.h +++ b/gst/tcp/gstmultifdsink.h @@ -46,12 +46,6 @@ G_BEGIN_DECLS typedef struct _GstMultiFdSink GstMultiFdSink; typedef struct _GstMultiFdSinkClass GstMultiFdSinkClass; -typedef enum { - GST_MULTI_FD_SINK_OPEN = (GST_ELEMENT_FLAG_LAST << 0), - - GST_MULTI_FD_SINK_FLAG_LAST = (GST_ELEMENT_FLAG_LAST << 2) -} GstMultiFdSinkFlags; - /** * GstTCPUnitType: @@ -73,43 +67,20 @@ typedef enum /* structure for a client */ typedef struct { + GstMultiHandleClient client; + GstPollFD fd; - gint bufpos; /* position of this client in the global queue */ - gint flushcount; /* the remaining number of buffers to flush out or -1 if the - client is not flushing. */ - - GstClientStatus status; gboolean is_socket; - GSList *sending; /* the buffers we need to send */ - gint bufoffset; /* offset in the first buffer */ - - gboolean discont; - gboolean caps_sent; - gboolean new_connection; - - gboolean currently_removing; /* method to sync client when connecting */ GstSyncMethod sync_method; GstTCPUnitType burst_min_unit; - guint64 burst_min_value; + guint64 burst_min_value; GstTCPUnitType burst_max_unit; - guint64 burst_max_value; - - GstCaps *caps; /* caps of last queued buffer */ - - /* stats */ - guint64 bytes_sent; - guint64 connect_time; - guint64 disconnect_time; - guint64 last_activity_time; - guint64 dropped_buffers; - guint64 avg_queue_size; - guint64 first_buffer_ts; - guint64 last_buffer_ts; + guint64 burst_max_value; } GstTCPClient; /** @@ -118,12 +89,9 @@ typedef struct { * The multifdsink object structure. */ struct _GstMultiFdSink { - GstBaseSink element; + GstMultiHandleSink element; /*< private >*/ - guint64 bytes_to_serve; /* how much bytes we must serve */ - guint64 bytes_served; /* how much bytes have we served */ - GRecMutex clientslock; /* lock to protect the clients list */ GList *clients; /* list of clients we are serving */ GHashTable *fd_hash; /* index on fd to client */ @@ -149,20 +117,10 @@ struct _GstMultiFdSink { GstTCPUnitType unit_type;/* the type of the units */ gint64 units_max; /* max units to queue for a client */ gint64 units_soft_max; /* max units a client can lag before recovery starts */ - GstRecoverPolicy recover_policy; - GstClockTime timeout; /* max amount of nanoseconds to remain idle */ - GstSyncMethod def_sync_method; /* what method to use for connecting clients */ GstTCPUnitType def_burst_unit; guint64 def_burst_value; - /* these values are used to control the amount of data - * kept in the queues. It allows clients to perform a burst - * on connect. */ - gint bytes_min; /* min number of bytes to queue */ - gint64 time_min; /* min time to queue */ - gint buffers_min; /* min number of buffers to queue */ - gboolean resend_streamheader; /* resend streamheader if it changes */ /* stats */ @@ -174,7 +132,7 @@ struct _GstMultiFdSink { }; struct _GstMultiFdSinkClass { - GstBaseSinkClass parent_class; + GstMultiHandleSinkClass parent_class; /* element methods */ void (*add) (GstMultiFdSink *sink, int fd); @@ -206,7 +164,7 @@ void gst_multi_fd_sink_add_full (GstMultiFdSink *sink, int fd, GstS GstTCPUnitType max_unit, guint64 max_value); void gst_multi_fd_sink_remove (GstMultiFdSink *sink, int fd); void gst_multi_fd_sink_remove_flush (GstMultiFdSink *sink, int fd); -void gst_multi_fd_sink_clear (GstMultiFdSink *sink); +void gst_multi_fd_sink_clear (GstMultiHandleSink *sink); GValueArray* gst_multi_fd_sink_get_stats (GstMultiFdSink *sink, int fd); G_END_DECLS diff --git a/gst/tcp/gstmultihandlesink.c b/gst/tcp/gstmultihandlesink.c index d85874677d..291039a412 100644 --- a/gst/tcp/gstmultihandlesink.c +++ b/gst/tcp/gstmultihandlesink.c @@ -22,26 +22,26 @@ */ /** - * SECTION:element-multisocketsink + * SECTION:element-multihandlesink * @see_also: tcpserversink * * This plugin writes incoming data to a set of file descriptors. The - * file descriptors can be added to multisocketsink by emitting the #GstMultiHandleSink::add signal. + * file descriptors can be added to multihandlesink by emitting the #GstMultiHandleSink::add signal. * For each descriptor added, the #GstMultiHandleSink::client-added signal will be called. * * As of version 0.10.8, a client can also be added with the #GstMultiHandleSink::add-full signal * that allows for more control over what and how much data a client * initially receives. * - * Clients can be removed from multisocketsink by emitting the #GstMultiHandleSink::remove signal. For + * Clients can be removed from multihandlesink by emitting the #GstMultiHandleSink::remove signal. For * each descriptor removed, the #GstMultiHandleSink::client-removed signal will be called. The - * #GstMultiHandleSink::client-removed signal can also be fired when multisocketsink decides that a + * #GstMultiHandleSink::client-removed signal can also be fired when multihandlesink decides that a * client is not active anymore or, depending on the value of the * #GstMultiHandleSink:recover-policy property, if the client is reading too slowly. - * In all cases, multisocketsink will never close a file descriptor itself. - * The user of multisocketsink is responsible for closing all file descriptors. + * In all cases, multihandlesink will never close a file descriptor itself. + * The user of multihandlesink is responsible for closing all file descriptors. * This can for example be done in response to the #GstMultiHandleSink::client-fd-removed signal. - * Note that multisocketsink still has a reference to the file descriptor when the + * Note that multihandlesink still has a reference to the file descriptor when the * #GstMultiHandleSink::client-removed signal is emitted, so that "get-stats" can be performed on * the descriptor; it is therefore not safe to close the file descriptor in * the #GstMultiHandleSink::client-removed signal handler, and you should use the @@ -52,11 +52,11 @@ * client write can block the pipeline and that clients can read with different * speeds. * - * When adding a client to multisocketsink, the #GstMultiHandleSink:sync-method property will define + * When adding a client to multihandlesink, the #GstMultiHandleSink:sync-method property will define * which buffer in the queued buffers will be sent first to the client. Clients * can be sent the most recent buffer (which might not be decodable by the * client if it is not a keyframe), the next keyframe received in - * multisocketsink (which can take some time depending on the keyframe rate), or the + * multihandlesink (which can take some time depending on the keyframe rate), or the * last received keyframe (which will cause a simple burst-on-connect). * Multisocketsink will always keep at least one keyframe in its internal buffers * when the sync-mode is set to latest-keyframe. @@ -76,13 +76,13 @@ * actually be honored. * * When streaming data, clients are allowed to read at a different rate than - * the rate at which multisocketsink receives data. If the client is reading too - * fast, no data will be send to the client until multisocketsink receives more + * the rate at which multihandlesink receives data. If the client is reading too + * fast, no data will be send to the client until multihandlesink receives more * data. If the client, however, reads too slowly, data for that client will be - * queued up in multisocketsink. Two properties control the amount of data - * (buffers) that is queued in multisocketsink: #GstMultiHandleSink:buffers-max and + * queued up in multihandlesink. Two properties control the amount of data + * (buffers) that is queued in multihandlesink: #GstMultiHandleSink:buffers-max and * #GstMultiHandleSink:buffers-soft-max. A client that falls behind by - * #GstMultiHandleSink:buffers-max is removed from multisocketsink forcibly. + * #GstMultiHandleSink:buffers-max is removed from multihandlesink forcibly. * * A client with a lag of at least #GstMultiHandleSink:buffers-soft-max enters the recovery * procedure which is controlled with the #GstMultiHandleSink:recover-policy property. @@ -92,7 +92,7 @@ * RESYNC_KEYFRAME positions the client at the most recent keyframe in the * buffer queue. * - * multisocketsink will by default synchronize on the clock before serving the + * multihandlesink will by default synchronize on the clock before serving the * buffers to the clients. This behaviour can be disabled by setting the sync * property to FALSE. Multisocketsink will by default not do QoS and will never * drop late buffers. @@ -106,7 +106,7 @@ #include -#include "gstmultisocketsink.h" +#include "gstmultihandlesink.h" #include "gsttcp-marshal.h" #ifndef G_OS_WIN32 @@ -120,8 +120,8 @@ static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink", GST_PAD_ALWAYS, GST_STATIC_CAPS_ANY); -GST_DEBUG_CATEGORY_STATIC (multisocketsink_debug); -#define GST_CAT_DEFAULT (multisocketsink_debug) +GST_DEBUG_CATEGORY_STATIC (multihandlesink_debug); +#define GST_CAT_DEFAULT (multihandlesink_debug) /* MultiHandleSink signals and args */ enum @@ -168,6 +168,7 @@ enum enum { PROP_0, +#if 0 PROP_MODE, PROP_BUFFERS_QUEUED, PROP_BYTES_QUEUED, @@ -179,6 +180,7 @@ enum PROP_BUFFERS_MAX, PROP_BUFFERS_SOFT_MAX, +#endif PROP_TIME_MIN, PROP_BYTES_MIN, @@ -190,6 +192,7 @@ enum PROP_BYTES_TO_SERVE, PROP_BYTES_SERVED, +#if 0 PROP_BURST_FORMAT, PROP_BURST_VALUE, @@ -200,12 +203,15 @@ enum PROP_RESEND_STREAMHEADER, PROP_NUM_SOCKETS, +#endif PROP_LAST }; -#define GST_TYPE_RECOVER_POLICY (gst_multi_handle_sink_recover_policy_get_type()) -static GType +// FIXME: make static again when refactored +//#define GST_TYPE_RECOVER_POLICY (gst_multi_handle_sink_recover_policy_get_type()) +//static GType +GType gst_multi_handle_sink_recover_policy_get_type (void) { static GType recover_policy_type = 0; @@ -229,8 +235,10 @@ gst_multi_handle_sink_recover_policy_get_type (void) return recover_policy_type; } -#define GST_TYPE_SYNC_METHOD (gst_multi_handle_sink_sync_method_get_type()) -static GType +// FIXME: make static again after refactoring +//#define GST_TYPE_SYNC_METHOD (gst_multi_handle_sink_sync_method_get_type()) +//static GType +GType gst_multi_handle_sink_sync_method_get_type (void) { static GType sync_method_type = 0; @@ -259,8 +267,10 @@ gst_multi_handle_sink_sync_method_get_type (void) return sync_method_type; } -#define GST_TYPE_CLIENT_STATUS (gst_multi_handle_sink_client_status_get_type()) -static GType +// FIXME: make static again after refactoring +//#define GST_TYPE_CLIENT_STATUS (gst_multi_handle_sink_client_status_get_type()) +//static GType +GType gst_multi_handle_sink_client_status_get_type (void) { static GType client_status_type = 0; @@ -283,19 +293,25 @@ gst_multi_handle_sink_client_status_get_type (void) return client_status_type; } +#if 0 static void gst_multi_handle_sink_finalize (GObject * object); +#endif +#if 0 static void gst_multi_handle_sink_remove_client_link (GstMultiHandleSink * sink, GList * link); static gboolean gst_multi_handle_sink_socket_condition (GSocket * socket, GIOCondition condition, GstMultiHandleSink * sink); +#endif +#if 0 static GstFlowReturn gst_multi_handle_sink_render (GstBaseSink * bsink, GstBuffer * buf); static gboolean gst_multi_handle_sink_unlock (GstBaseSink * bsink); static gboolean gst_multi_handle_sink_unlock_stop (GstBaseSink * bsink); static GstStateChangeReturn gst_multi_handle_sink_change_state (GstElement * element, GstStateChange transition); +#endif static void gst_multi_handle_sink_set_property (GObject * object, guint prop_id, const GValue * value, GParamSpec * pspec); @@ -312,26 +328,36 @@ gst_multi_handle_sink_class_init (GstMultiHandleSinkClass * klass) { GObjectClass *gobject_class; GstElementClass *gstelement_class; +#if 0 GstBaseSinkClass *gstbasesink_class; +#endif gobject_class = (GObjectClass *) klass; gstelement_class = (GstElementClass *) klass; +#if 0 gstbasesink_class = (GstBaseSinkClass *) klass; +#endif gobject_class->set_property = gst_multi_handle_sink_set_property; gobject_class->get_property = gst_multi_handle_sink_get_property; +#if 0 gobject_class->finalize = gst_multi_handle_sink_finalize; +#endif +#if 0 g_object_class_install_property (gobject_class, PROP_BUFFERS_MAX, g_param_spec_int ("buffers-max", "Buffers max", "max number of buffers to queue for a client (-1 = no limit)", -1, G_MAXINT, DEFAULT_BUFFERS_MAX, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); +#endif +#if 0 g_object_class_install_property (gobject_class, PROP_BUFFERS_SOFT_MAX, g_param_spec_int ("buffers-soft-max", "Buffers soft max", "Recover client when going over this limit (-1 = no limit)", -1, G_MAXINT, DEFAULT_BUFFERS_SOFT_MAX, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); +#endif g_object_class_install_property (gobject_class, PROP_BYTES_MIN, g_param_spec_int ("bytes-min", "Bytes min", @@ -349,6 +375,7 @@ gst_multi_handle_sink_class_init (GstMultiHandleSinkClass * klass) G_MAXINT, DEFAULT_BUFFERS_MIN, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); +#if 0 g_object_class_install_property (gobject_class, PROP_UNIT_TYPE, g_param_spec_enum ("unit-type", "Units type", "The unit to measure the max/soft-max/queued properties", @@ -377,6 +404,7 @@ gst_multi_handle_sink_class_init (GstMultiHandleSinkClass * klass) g_param_spec_uint64 ("time-queued", "Time queued", "Number of time currently queued", 0, G_MAXUINT64, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); +#endif #endif g_object_class_install_property (gobject_class, PROP_RECOVER_POLICY, @@ -402,6 +430,7 @@ gst_multi_handle_sink_class_init (GstMultiHandleSinkClass * klass) "Total number of bytes send to all clients", 0, G_MAXUINT64, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); +#if 0 g_object_class_install_property (gobject_class, PROP_BURST_FORMAT, g_param_spec_enum ("burst-format", "Burst format", "The format of the burst units (when sync-method is burst[[-with]-keyframe])", @@ -446,13 +475,15 @@ gst_multi_handle_sink_class_init (GstMultiHandleSinkClass * klass) g_param_spec_uint ("num-sockets", "Number of sockets", "The current number of client sockets", 0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); +#endif +#if 0 /** * GstMultiHandleSink::add: - * @gstmultisocketsink: the multisocketsink element to emit this signal on - * @socket: the socket to add to multisocketsink + * @gstmultihandlesink: the multihandlesink element to emit this signal on + * @socket: the socket to add to multihandlesink * - * Hand the given open socket to multisocketsink to write to. + * Hand the given open socket to multihandlesink to write to. */ gst_multi_handle_sink_signals[SIGNAL_ADD] = g_signal_new ("add", G_TYPE_FROM_CLASS (klass), @@ -461,8 +492,8 @@ gst_multi_handle_sink_class_init (GstMultiHandleSinkClass * klass) g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1, G_TYPE_SOCKET); /** * GstMultiHandleSink::add-full: - * @gstmultisocketsink: the multisocketsink element to emit this signal on - * @socket: the socket to add to multisocketsink + * @gstmultihandlesink: the multihandlesink element to emit this signal on + * @socket: the socket to add to multihandlesink * @sync: the sync method to use * @format_min: the format of @value_min * @value_min: the minimum amount of data to burst expressed in @@ -471,7 +502,7 @@ gst_multi_handle_sink_class_init (GstMultiHandleSinkClass * klass) * @value_max: the maximum amount of data to burst expressed in * @format_max units. * - * Hand the given open socket to multisocketsink to write to and + * Hand the given open socket to multihandlesink to write to and * specify the burst parameters for the new connection. */ gst_multi_handle_sink_signals[SIGNAL_ADD_BURST] = @@ -483,10 +514,10 @@ gst_multi_handle_sink_class_init (GstMultiHandleSinkClass * klass) GST_TYPE_FORMAT, G_TYPE_UINT64); /** * GstMultiHandleSink::remove: - * @gstmultisocketsink: the multisocketsink element to emit this signal on - * @socket: the socket to remove from multisocketsink + * @gstmultihandlesink: the multihandlesink element to emit this signal on + * @socket: the socket to remove from multihandlesink * - * Remove the given open socket from multisocketsink. + * Remove the given open socket from multihandlesink. */ gst_multi_handle_sink_signals[SIGNAL_REMOVE] = g_signal_new ("remove", G_TYPE_FROM_CLASS (klass), @@ -495,10 +526,10 @@ gst_multi_handle_sink_class_init (GstMultiHandleSinkClass * klass) g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1, G_TYPE_SOCKET); /** * GstMultiHandleSink::remove-flush: - * @gstmultisocketsink: the multisocketsink element to emit this signal on - * @socket: the socket to remove from multisocketsink + * @gstmultihandlesink: the multihandlesink element to emit this signal on + * @socket: the socket to remove from multihandlesink * - * Remove the given open socket from multisocketsink after flushing all + * Remove the given open socket from multihandlesink after flushing all * the pending data to the socket. */ gst_multi_handle_sink_signals[SIGNAL_REMOVE_FLUSH] = @@ -506,11 +537,12 @@ gst_multi_handle_sink_class_init (GstMultiHandleSinkClass * klass) G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstMultiHandleSinkClass, remove_flush), NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1, G_TYPE_SOCKET); +#endif /** * GstMultiHandleSink::clear: - * @gstmultisocketsink: the multisocketsink element to emit this signal on + * @gstmultihandlesink: the multihandlesink element to emit this signal on * - * Remove all sockets from multisocketsink. Since multisocketsink did not + * Remove all sockets from multihandlesink. Since multihandlesink did not * open sockets itself, it does not explicitly close the sockets. The application * should do so by connecting to the client-socket-removed callback. */ @@ -520,10 +552,11 @@ gst_multi_handle_sink_class_init (GstMultiHandleSinkClass * klass) G_STRUCT_OFFSET (GstMultiHandleSinkClass, clear), NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0); +#if 0 /** * GstMultiHandleSink::get-stats: - * @gstmultisocketsink: the multisocketsink element to emit this signal on - * @socket: the socket to get stats of from multisocketsink + * @gstmultihandlesink: the multihandlesink element to emit this signal on + * @socket: the socket to get stats of from multihandlesink * * Get statistics about @socket. This function returns a GstStructure. * @@ -542,10 +575,10 @@ gst_multi_handle_sink_class_init (GstMultiHandleSinkClass * klass) /** * GstMultiHandleSink::client-added: - * @gstmultisocketsink: the multisocketsink element that emitted this signal - * @socket: the socket that was added to multisocketsink + * @gstmultihandlesink: the multihandlesink element that emitted this signal + * @socket: the socket that was added to multihandlesink * - * The given socket was added to multisocketsink. This signal will + * The given socket was added to multihandlesink. This signal will * be emitted from the streaming thread so application should be prepared * for that. */ @@ -556,15 +589,15 @@ gst_multi_handle_sink_class_init (GstMultiHandleSinkClass * klass) G_TYPE_NONE, 1, G_TYPE_OBJECT); /** * GstMultiHandleSink::client-removed: - * @gstmultisocketsink: the multisocketsink element that emitted this signal - * @socket: the socket that is to be removed from multisocketsink + * @gstmultihandlesink: the multihandlesink element that emitted this signal + * @socket: the socket that is to be removed from multihandlesink * @status: the reason why the client was removed * - * The given socket is about to be removed from multisocketsink. This + * The given socket is about to be removed from multihandlesink. This * signal will be emitted from the streaming thread so applications should * be prepared for that. * - * @gstmultisocketsink still holds a handle to @socket so it is possible to call + * @gstmultihandlesink still holds a handle to @socket so it is possible to call * the get-stats signal from this callback. For the same reason it is * not safe to close() and reuse @socket in this callback. */ @@ -575,14 +608,14 @@ gst_multi_handle_sink_class_init (GstMultiHandleSinkClass * klass) G_TYPE_NONE, 2, G_TYPE_INT, GST_TYPE_CLIENT_STATUS); /** * GstMultiHandleSink::client-socket-removed: - * @gstmultisocketsink: the multisocketsink element that emitted this signal - * @socket: the socket that was removed from multisocketsink + * @gstmultihandlesink: the multihandlesink element that emitted this signal + * @socket: the socket that was removed from multihandlesink * - * The given socket was removed from multisocketsink. This signal will + * The given socket was removed from multihandlesink. This signal will * be emitted from the streaming thread so applications should be prepared * for that. * - * In this callback, @gstmultisocketsink has removed all the information + * In this callback, @gstmultihandlesink has removed all the information * associated with @socket and it is therefore not possible to call get-stats * with @socket. It is however safe to close() and reuse @fd in the callback. * @@ -593,6 +626,7 @@ gst_multi_handle_sink_class_init (GstMultiHandleSinkClass * klass) G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstMultiHandleSinkClass, client_socket_removed), NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1, G_TYPE_SOCKET); +#endif gst_element_class_add_pad_template (gstelement_class, gst_static_pad_template_get (&sinktemplate)); @@ -604,6 +638,7 @@ gst_multi_handle_sink_class_init (GstMultiHandleSinkClass * klass) "Wim Taymans , " "Sebastian Dröge "); +#if 0 gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_multi_handle_sink_change_state); @@ -611,15 +646,18 @@ gst_multi_handle_sink_class_init (GstMultiHandleSinkClass * klass) gstbasesink_class->unlock = GST_DEBUG_FUNCPTR (gst_multi_handle_sink_unlock); gstbasesink_class->unlock_stop = GST_DEBUG_FUNCPTR (gst_multi_handle_sink_unlock_stop); +#endif +#if 0 klass->add = GST_DEBUG_FUNCPTR (gst_multi_handle_sink_add); klass->add_full = GST_DEBUG_FUNCPTR (gst_multi_handle_sink_add_full); klass->remove = GST_DEBUG_FUNCPTR (gst_multi_handle_sink_remove); klass->remove_flush = GST_DEBUG_FUNCPTR (gst_multi_handle_sink_remove_flush); klass->clear = GST_DEBUG_FUNCPTR (gst_multi_handle_sink_clear); klass->get_stats = GST_DEBUG_FUNCPTR (gst_multi_handle_sink_get_stats); +#endif - GST_DEBUG_CATEGORY_INIT (multisocketsink_debug, "multisocketsink", 0, + GST_DEBUG_CATEGORY_INIT (multihandlesink_debug, "multihandlesink", 0, "Multi socket sink"); } @@ -655,6 +693,7 @@ gst_multi_handle_sink_init (GstMultiHandleSink * this) this->cancellable = g_cancellable_new (); } +#if 0 static void gst_multi_handle_sink_finalize (GObject * object) { @@ -754,7 +793,37 @@ setup_dscp (GstMultiHandleSink * sink) } CLIENTS_UNLOCK (sink); } +#endif +void +gst_multi_handle_sink_client_init (GstMultiHandleClient * client, + GstSyncMethod sync_method) +{ + GTimeVal now; + + client->status = GST_CLIENT_STATUS_OK; + client->bufpos = -1; + client->flushcount = -1; + client->bufoffset = 0; + client->sending = NULL; + client->bytes_sent = 0; + client->dropped_buffers = 0; + client->avg_queue_size = 0; + client->first_buffer_ts = GST_CLOCK_TIME_NONE; + client->last_buffer_ts = GST_CLOCK_TIME_NONE; + client->new_connection = TRUE; + client->sync_method = sync_method; + client->currently_removing = FALSE; + + /* update start time */ + g_get_current_time (&now); + client->connect_time = GST_TIMEVAL_TO_TIME (now); + client->disconnect_time = 0; + /* set last activity time to connect time */ + client->last_activity_time = client->connect_time; +} + +#if 0 /* "add-full" signal implementation */ void gst_multi_handle_sink_add_full (GstMultiHandleSink * sink, GSocket * socket, @@ -779,30 +848,10 @@ gst_multi_handle_sink_add_full (GstMultiHandleSink * sink, GSocket * socket, /* create client datastructure */ client = g_new0 (GstSocketClient, 1); client->socket = G_SOCKET (g_object_ref (socket)); - client->status = GST_CLIENT_STATUS_OK; - client->bufpos = -1; - client->flushcount = -1; - client->bufoffset = 0; - client->sending = NULL; - client->bytes_sent = 0; - client->dropped_buffers = 0; - client->avg_queue_size = 0; - client->first_buffer_ts = GST_CLOCK_TIME_NONE; - client->last_buffer_ts = GST_CLOCK_TIME_NONE; - client->new_connection = TRUE; client->burst_min_format = min_format; client->burst_min_value = min_value; client->burst_max_format = max_format; client->burst_max_value = max_value; - client->sync_method = sync_method; - client->currently_removing = FALSE; - - /* update start time */ - g_get_current_time (&now); - client->connect_time = GST_TIMEVAL_TO_TIME (now); - client->disconnect_time = 0; - /* set last activity time to connect time */ - client->last_activity_time = client->connect_time; CLIENTS_LOCK (sink); @@ -984,7 +1033,7 @@ gst_multi_handle_sink_get_stats (GstMultiHandleSink * sink, GSocket * socket) if (client != NULL) { guint64 interval; - result = gst_structure_new_empty ("multisocketsink-stats"); + result = gst_structure_new_empty ("multihandlesink-stats"); if (client->disconnect_time == 0) { GTimeVal nowtv; @@ -1013,13 +1062,15 @@ noclient: /* python doesn't like a NULL pointer yet */ if (result == NULL) { GST_WARNING_OBJECT (sink, "[socket %p] no client with this found!", socket); - result = gst_structure_new_empty ("multisocketsink-stats"); + result = gst_structure_new_empty ("multihandlesink-stats"); } return result; } +#endif -/* should be called with the clientslock helt. +#if 0 +/* should be called with the clientslock held. * Note that we don't close the fd as we didn't open it in the first * place. An application should connect to the client-fd-removed signal and * close the fd itself. @@ -1133,7 +1184,9 @@ gst_multi_handle_sink_remove_client_link (GstMultiHandleSink * sink, CLIENTS_LOCK (sink); } +#endif +#if 0 /* handle a read on a client socket, * which either indicates a close or should be ignored * returns FALSE if some error occured or the client closed. */ @@ -1187,19 +1240,9 @@ gst_multi_handle_sink_handle_client_read (GstMultiHandleSink * sink, return ret; } +#endif -static gboolean -is_sync_frame (GstMultiHandleSink * sink, GstBuffer * buffer) -{ - if (GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DELTA_UNIT)) { - return FALSE; - } else if (!GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_IN_CAPS)) { - return TRUE; - } - - return FALSE; -} - +#if 0 /* queue the given buffer for the given client */ static gboolean gst_multi_handle_sink_client_queue_buffer (GstMultiHandleSink * sink, @@ -1318,6 +1361,20 @@ gst_multi_handle_sink_client_queue_buffer (GstMultiHandleSink * sink, return TRUE; } +#endif + +#if 0 +static gboolean +is_sync_frame (GstMultiHandleSink * sink, GstBuffer * buffer) +{ + if (GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DELTA_UNIT)) { + return FALSE; + } else if (!GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_IN_CAPS)) { + return TRUE; + } + + return FALSE; +} /* find the keyframe in the list of buffers starting the * search from @idx. @direction as -1 will search backwards, @@ -1349,10 +1406,12 @@ find_syncframe (GstMultiHandleSink * sink, gint idx, gint direction) } return result; } +#endif #define find_next_syncframe(s,i) find_syncframe(s,i,1) #define find_prev_syncframe(s,i) find_syncframe(s,i,-1) +#if 0 /* Get the number of buffers from the buffer queue needed to satisfy * the maximum max in the configured units. * If units are not BUFFERS, and there are insufficient buffers in the @@ -1409,7 +1468,9 @@ get_buffers_max (GstMultiHandleSink * sink, gint64 max) return max; } } +#endif +#if 0 /* find the positions in the buffer queue where *_min and *_max * is satisfied */ @@ -1553,7 +1614,9 @@ assign_value (GstFormat format, guint64 value, gint * bytes, gint * buffers, } return res; } +#endif +#if 0 /* count the index in the buffer queue to satisfy the given unit * and value pair starting from buffer at index 0. * @@ -1577,7 +1640,9 @@ count_burst_unit (GstMultiHandleSink * sink, gint * min_idx, return find_limits (sink, min_idx, bytes_min, buffers_min, time_min, max_idx, bytes_max, buffers_max, time_max); } +#endif +#if 0 /* decide where in the current buffer queue this new client should start * receiving buffers from. * This function is called whenever a client is connected and has not yet @@ -1778,7 +1843,9 @@ gst_multi_handle_sink_new_client (GstMultiHandleSink * sink, } return result; } +#endif +#if 0 /* Handle a write on a client, * which indicates a read request from a client. * @@ -1968,7 +2035,9 @@ write_error: return FALSE; } } +#endif +#if 0 /* calculate the new position for a client after recovery. This function * does not update the client position but merely returns the required * position. @@ -2021,7 +2090,9 @@ gst_multi_handle_sink_recover_client (GstMultiHandleSink * sink, } return newbufpos; } +#endif +#if 0 /* Queue a buffer on the global queue. * * This function adds the buffer to the front of a GArray. It removes the @@ -2210,7 +2281,9 @@ restart: sink->buffers_queued = max_buffer_usage; CLIENTS_UNLOCK (sink); } +#endif +#if 0 /* Handle the clients. This is called when a socket becomes ready * to read or writable. Badly behaving clients are put on a * garbage list and removed. @@ -2271,7 +2344,9 @@ done: return ret; } +#endif +#if 0 static gboolean gst_multi_handle_sink_timeout (GstMultiHandleSink * sink) { @@ -2296,7 +2371,9 @@ gst_multi_handle_sink_timeout (GstMultiHandleSink * sink) return FALSE; } +#endif +#if 0 /* we handle the client communication in another thread so that we do not block * the gstreamer thread while we select() on the client fds */ static gpointer @@ -2328,7 +2405,9 @@ gst_multi_handle_sink_thread (GstMultiHandleSink * sink) return NULL; } +#endif +#if 0 static GstFlowReturn gst_multi_handle_sink_render (GstBaseSink * bsink, GstBuffer * buf) { @@ -2435,66 +2514,72 @@ no_caps: } #endif } +#endif static void gst_multi_handle_sink_set_property (GObject * object, guint prop_id, const GValue * value, GParamSpec * pspec) { - GstMultiHandleSink *multisocketsink; + GstMultiHandleSink *multihandlesink; - multisocketsink = GST_MULTI_HANDLE_SINK (object); + multihandlesink = GST_MULTI_HANDLE_SINK (object); switch (prop_id) { +#if 0 case PROP_BUFFERS_MAX: - multisocketsink->units_max = g_value_get_int (value); + multihandlesink->units_max = g_value_get_int (value); break; case PROP_BUFFERS_SOFT_MAX: - multisocketsink->units_soft_max = g_value_get_int (value); + multihandlesink->units_soft_max = g_value_get_int (value); break; +#endif case PROP_TIME_MIN: - multisocketsink->time_min = g_value_get_int64 (value); + multihandlesink->time_min = g_value_get_int64 (value); break; case PROP_BYTES_MIN: - multisocketsink->bytes_min = g_value_get_int (value); + multihandlesink->bytes_min = g_value_get_int (value); break; case PROP_BUFFERS_MIN: - multisocketsink->buffers_min = g_value_get_int (value); + multihandlesink->buffers_min = g_value_get_int (value); break; +#if 0 case PROP_UNIT_TYPE: - multisocketsink->unit_type = g_value_get_enum (value); + multihandlesink->unit_type = g_value_get_enum (value); break; case PROP_UNITS_MAX: - multisocketsink->units_max = g_value_get_int64 (value); + multihandlesink->units_max = g_value_get_int64 (value); break; case PROP_UNITS_SOFT_MAX: - multisocketsink->units_soft_max = g_value_get_int64 (value); + multihandlesink->units_soft_max = g_value_get_int64 (value); break; +#endif case PROP_RECOVER_POLICY: - multisocketsink->recover_policy = g_value_get_enum (value); + multihandlesink->recover_policy = g_value_get_enum (value); break; case PROP_TIMEOUT: - multisocketsink->timeout = g_value_get_uint64 (value); + multihandlesink->timeout = g_value_get_uint64 (value); break; case PROP_SYNC_METHOD: - multisocketsink->def_sync_method = g_value_get_enum (value); + multihandlesink->def_sync_method = g_value_get_enum (value); break; +#if 0 case PROP_BURST_FORMAT: - multisocketsink->def_burst_format = g_value_get_enum (value); + multihandlesink->def_burst_format = g_value_get_enum (value); break; case PROP_BURST_VALUE: - multisocketsink->def_burst_value = g_value_get_uint64 (value); + multihandlesink->def_burst_value = g_value_get_uint64 (value); break; case PROP_QOS_DSCP: - multisocketsink->qos_dscp = g_value_get_int (value); - setup_dscp (multisocketsink); + multihandlesink->qos_dscp = g_value_get_int (value); + setup_dscp (multihandlesink); break; case PROP_HANDLE_READ: - multisocketsink->handle_read = g_value_get_boolean (value); + multihandlesink->handle_read = g_value_get_boolean (value); break; case PROP_RESEND_STREAMHEADER: - multisocketsink->resend_streamheader = g_value_get_boolean (value); + multihandlesink->resend_streamheader = g_value_get_boolean (value); break; - +#endif default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -2505,79 +2590,84 @@ static void gst_multi_handle_sink_get_property (GObject * object, guint prop_id, GValue * value, GParamSpec * pspec) { - GstMultiHandleSink *multisocketsink; + GstMultiHandleSink *multihandlesink; - multisocketsink = GST_MULTI_HANDLE_SINK (object); + multihandlesink = GST_MULTI_HANDLE_SINK (object); switch (prop_id) { +#if 0 case PROP_BUFFERS_MAX: - g_value_set_int (value, multisocketsink->units_max); + g_value_set_int (value, multihandlesink->units_max); break; case PROP_BUFFERS_SOFT_MAX: - g_value_set_int (value, multisocketsink->units_soft_max); + g_value_set_int (value, multihandlesink->units_soft_max); break; +#endif case PROP_TIME_MIN: - g_value_set_int64 (value, multisocketsink->time_min); + g_value_set_int64 (value, multihandlesink->time_min); break; case PROP_BYTES_MIN: - g_value_set_int (value, multisocketsink->bytes_min); + g_value_set_int (value, multihandlesink->bytes_min); break; case PROP_BUFFERS_MIN: - g_value_set_int (value, multisocketsink->buffers_min); + g_value_set_int (value, multihandlesink->buffers_min); break; +#if 0 case PROP_BUFFERS_QUEUED: - g_value_set_uint (value, multisocketsink->buffers_queued); + g_value_set_uint (value, multihandlesink->buffers_queued); break; case PROP_BYTES_QUEUED: - g_value_set_uint (value, multisocketsink->bytes_queued); + g_value_set_uint (value, multihandlesink->bytes_queued); break; case PROP_TIME_QUEUED: - g_value_set_uint64 (value, multisocketsink->time_queued); + g_value_set_uint64 (value, multihandlesink->time_queued); break; case PROP_UNIT_TYPE: - g_value_set_enum (value, multisocketsink->unit_type); + g_value_set_enum (value, multihandlesink->unit_type); break; case PROP_UNITS_MAX: - g_value_set_int64 (value, multisocketsink->units_max); + g_value_set_int64 (value, multihandlesink->units_max); break; case PROP_UNITS_SOFT_MAX: - g_value_set_int64 (value, multisocketsink->units_soft_max); + g_value_set_int64 (value, multihandlesink->units_soft_max); break; +#endif case PROP_RECOVER_POLICY: - g_value_set_enum (value, multisocketsink->recover_policy); + g_value_set_enum (value, multihandlesink->recover_policy); break; case PROP_TIMEOUT: - g_value_set_uint64 (value, multisocketsink->timeout); + g_value_set_uint64 (value, multihandlesink->timeout); break; case PROP_SYNC_METHOD: - g_value_set_enum (value, multisocketsink->def_sync_method); + g_value_set_enum (value, multihandlesink->def_sync_method); break; case PROP_BYTES_TO_SERVE: - g_value_set_uint64 (value, multisocketsink->bytes_to_serve); + g_value_set_uint64 (value, multihandlesink->bytes_to_serve); break; case PROP_BYTES_SERVED: - g_value_set_uint64 (value, multisocketsink->bytes_served); + g_value_set_uint64 (value, multihandlesink->bytes_served); break; +#if 0 case PROP_BURST_FORMAT: - g_value_set_enum (value, multisocketsink->def_burst_format); + g_value_set_enum (value, multihandlesink->def_burst_format); break; case PROP_BURST_VALUE: - g_value_set_uint64 (value, multisocketsink->def_burst_value); + g_value_set_uint64 (value, multihandlesink->def_burst_value); break; case PROP_QOS_DSCP: - g_value_set_int (value, multisocketsink->qos_dscp); + g_value_set_int (value, multihandlesink->qos_dscp); break; case PROP_HANDLE_READ: - g_value_set_boolean (value, multisocketsink->handle_read); + g_value_set_boolean (value, multihandlesink->handle_read); break; case PROP_RESEND_STREAMHEADER: - g_value_set_boolean (value, multisocketsink->resend_streamheader); + g_value_set_boolean (value, multihandlesink->resend_streamheader); break; case PROP_NUM_SOCKETS: g_value_set_uint (value, - g_hash_table_size (multisocketsink->socket_hash)); + g_hash_table_size (multihandlesink->socket_hash)); break; - +#endif default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -2585,6 +2675,7 @@ gst_multi_handle_sink_get_property (GObject * object, guint prop_id, } +#if 0 /* create a socket for sending to remote machine */ static gboolean gst_multi_handle_sink_start (GstBaseSink * bsink) @@ -2643,7 +2734,9 @@ multisocketsink_hash_remove (gpointer key, gpointer value, gpointer data) { return TRUE; } +#endif +#if 0 static gboolean gst_multi_handle_sink_stop (GstBaseSink * bsink) { @@ -2671,7 +2764,7 @@ gst_multi_handle_sink_stop (GstBaseSink * bsink) } /* free the clients */ - gst_multi_handle_sink_clear (this); + fclass->clear (this); if (this->streamheader) { g_slist_foreach (this->streamheader, (GFunc) gst_mini_object_unref, NULL); @@ -2707,7 +2800,9 @@ gst_multi_handle_sink_stop (GstBaseSink * bsink) return TRUE; } +#endif +#if 0 static GstStateChangeReturn gst_multi_handle_sink_change_state (GstElement * element, GstStateChange transition) @@ -2786,3 +2881,4 @@ gst_multi_handle_sink_unlock_stop (GstBaseSink * bsink) return TRUE; } +#endif diff --git a/gst/tcp/gstmultihandlesink.h b/gst/tcp/gstmultihandlesink.h index b5d917c1a5..efbff1eb55 100644 --- a/gst/tcp/gstmultihandlesink.h +++ b/gst/tcp/gstmultihandlesink.h @@ -47,13 +47,11 @@ G_BEGIN_DECLS typedef struct _GstMultiHandleSink GstMultiHandleSink; typedef struct _GstMultiHandleSinkClass GstMultiHandleSinkClass; -#if 0 typedef enum { GST_MULTI_HANDLE_SINK_OPEN = (GST_ELEMENT_FLAG_LAST << 0), GST_MULTI_HANDLE_SINK_FLAG_LAST = (GST_ELEMENT_FLAG_LAST << 2) } GstMultiHandleSinkFlags; -#endif /** * GstRecoverPolicy: @@ -122,13 +120,9 @@ typedef enum GST_CLIENT_STATUS_FLUSHING = 6 } GstClientStatus; -#if 0 /* structure for a client */ typedef struct { - GSocket *socket; - GSource *source; - gint bufpos; /* position of this client in the global queue */ gint flushcount; /* the remaining number of buffers to flush out or -1 if the client is not flushing. */ @@ -141,15 +135,18 @@ typedef struct { gboolean discont; gboolean new_connection; - gboolean currently_removing; + /* method to sync client when connecting */ GstSyncMethod sync_method; +// FIXME: refactor format vs unit +#if 0 GstFormat burst_min_format; guint64 burst_min_value; GstFormat burst_max_format; guint64 burst_max_value; +#endif GstCaps *caps; /* caps of last queued buffer */ @@ -162,8 +159,7 @@ typedef struct { guint64 avg_queue_size; guint64 first_buffer_ts; guint64 last_buffer_ts; -} GstSocketClient; -#endif +} GstMultiHandleClient; #define CLIENTS_LOCK_INIT(socketsink) (g_rec_mutex_init(&socketsink->clientslock)) #define CLIENTS_LOCK_CLEAR(socketsink) (g_rec_mutex_clear(&socketsink->clientslock)) @@ -257,17 +253,28 @@ struct _GstMultiHandleSinkClass { GType gst_multi_handle_sink_get_type (void); -#if 0 void gst_multi_handle_sink_add (GstMultiHandleSink *sink, GSocket *socket); void gst_multi_handle_sink_add_full (GstMultiHandleSink *sink, GSocket *socket, GstSyncMethod sync, GstFormat min_format, guint64 min_value, GstFormat max_format, guint64 max_value); void gst_multi_handle_sink_remove (GstMultiHandleSink *sink, GSocket *socket); void gst_multi_handle_sink_remove_flush (GstMultiHandleSink *sink, GSocket *socket); -void gst_multi_handle_sink_clear (GstMultiHandleSink *sink); GstStructure* gst_multi_handle_sink_get_stats (GstMultiHandleSink *sink, GSocket *socket); +void gst_multi_handle_sink_client_init (GstMultiHandleClient * client, GstSyncMethod sync_method); + +// FIXME: make static again after refactoring +#define GST_TYPE_RECOVER_POLICY (gst_multi_handle_sink_recover_policy_get_type()) +GType +gst_multi_handle_sink_recover_policy_get_type (void); +#define GST_TYPE_SYNC_METHOD (gst_multi_handle_sink_sync_method_get_type()) +GType +gst_multi_handle_sink_sync_method_get_type (void); +#define GST_TYPE_CLIENT_STATUS (gst_multi_handle_sink_client_status_get_type()) +GType +gst_multi_handle_sink_client_status_get_type (void); + + G_END_DECLS -#endif #endif /* __GST_MULTI_HANDLE_SINK_H__ */ diff --git a/gst/tcp/gstmultisocketsink.c b/gst/tcp/gstmultisocketsink.c index 14c46f18da..b01575db7c 100644 --- a/gst/tcp/gstmultisocketsink.c +++ b/gst/tcp/gstmultisocketsink.c @@ -131,7 +131,6 @@ enum SIGNAL_ADD_BURST, SIGNAL_REMOVE, SIGNAL_REMOVE_FLUSH, - SIGNAL_CLEAR, SIGNAL_GET_STATS, /* signals */ @@ -147,15 +146,9 @@ enum #define DEFAULT_MODE 1 #define DEFAULT_BUFFERS_MAX -1 #define DEFAULT_BUFFERS_SOFT_MAX -1 -#define DEFAULT_TIME_MIN -1 -#define DEFAULT_BYTES_MIN -1 -#define DEFAULT_BUFFERS_MIN -1 #define DEFAULT_UNIT_TYPE GST_FORMAT_BUFFERS #define DEFAULT_UNITS_MAX -1 #define DEFAULT_UNITS_SOFT_MAX -1 -#define DEFAULT_RECOVER_POLICY GST_RECOVER_POLICY_NONE -#define DEFAULT_TIMEOUT 0 -#define DEFAULT_SYNC_METHOD GST_SYNC_METHOD_LATEST #define DEFAULT_BURST_FORMAT GST_FORMAT_UNDEFINED #define DEFAULT_BURST_VALUE 0 @@ -180,16 +173,6 @@ enum PROP_BUFFERS_MAX, PROP_BUFFERS_SOFT_MAX, - PROP_TIME_MIN, - PROP_BYTES_MIN, - PROP_BUFFERS_MIN, - - PROP_RECOVER_POLICY, - PROP_TIMEOUT, - PROP_SYNC_METHOD, - PROP_BYTES_TO_SERVE, - PROP_BYTES_SERVED, - PROP_BURST_FORMAT, PROP_BURST_VALUE, @@ -204,84 +187,6 @@ enum PROP_LAST }; -#define GST_TYPE_RECOVER_POLICY (gst_multi_socket_sink_recover_policy_get_type()) -static GType -gst_multi_socket_sink_recover_policy_get_type (void) -{ - static GType recover_policy_type = 0; - static const GEnumValue recover_policy[] = { - {GST_RECOVER_POLICY_NONE, - "Do not try to recover", "none"}, - {GST_RECOVER_POLICY_RESYNC_LATEST, - "Resync client to latest buffer", "latest"}, - {GST_RECOVER_POLICY_RESYNC_SOFT_LIMIT, - "Resync client to soft limit", "soft-limit"}, - {GST_RECOVER_POLICY_RESYNC_KEYFRAME, - "Resync client to most recent keyframe", "keyframe"}, - {0, NULL, NULL}, - }; - - if (!recover_policy_type) { - recover_policy_type = - g_enum_register_static ("GstMultiSocketSinkRecoverPolicy", - recover_policy); - } - return recover_policy_type; -} - -#define GST_TYPE_SYNC_METHOD (gst_multi_socket_sink_sync_method_get_type()) -static GType -gst_multi_socket_sink_sync_method_get_type (void) -{ - static GType sync_method_type = 0; - static const GEnumValue sync_method[] = { - {GST_SYNC_METHOD_LATEST, - "Serve starting from the latest buffer", "latest"}, - {GST_SYNC_METHOD_NEXT_KEYFRAME, - "Serve starting from the next keyframe", "next-keyframe"}, - {GST_SYNC_METHOD_LATEST_KEYFRAME, - "Serve everything since the latest keyframe (burst)", - "latest-keyframe"}, - {GST_SYNC_METHOD_BURST, "Serve burst-value data to client", "burst"}, - {GST_SYNC_METHOD_BURST_KEYFRAME, - "Serve burst-value data starting on a keyframe", - "burst-keyframe"}, - {GST_SYNC_METHOD_BURST_WITH_KEYFRAME, - "Serve burst-value data preferably starting on a keyframe", - "burst-with-keyframe"}, - {0, NULL, NULL}, - }; - - if (!sync_method_type) { - sync_method_type = - g_enum_register_static ("GstMultiSocketSinkSyncMethod", sync_method); - } - return sync_method_type; -} - -#define GST_TYPE_CLIENT_STATUS (gst_multi_socket_sink_client_status_get_type()) -static GType -gst_multi_socket_sink_client_status_get_type (void) -{ - static GType client_status_type = 0; - static const GEnumValue client_status[] = { - {GST_CLIENT_STATUS_OK, "ok", "ok"}, - {GST_CLIENT_STATUS_CLOSED, "Closed", "closed"}, - {GST_CLIENT_STATUS_REMOVED, "Removed", "removed"}, - {GST_CLIENT_STATUS_SLOW, "Too slow", "slow"}, - {GST_CLIENT_STATUS_ERROR, "Error", "error"}, - {GST_CLIENT_STATUS_DUPLICATE, "Duplicate", "duplicate"}, - {GST_CLIENT_STATUS_FLUSHING, "Flushing", "flushing"}, - {0, NULL, NULL}, - }; - - if (!client_status_type) { - client_status_type = - g_enum_register_static ("GstMultiSocketSinkClientStatus", - client_status); - } - return client_status_type; -} static void gst_multi_socket_sink_finalize (GObject * object); @@ -303,7 +208,8 @@ static void gst_multi_socket_sink_get_property (GObject * object, guint prop_id, GValue * value, GParamSpec * pspec); #define gst_multi_socket_sink_parent_class parent_class -G_DEFINE_TYPE (GstMultiSocketSink, gst_multi_socket_sink, GST_TYPE_BASE_SINK); +G_DEFINE_TYPE (GstMultiSocketSink, gst_multi_socket_sink, + GST_TYPE_MULTI_HANDLE_SINK); static guint gst_multi_socket_sink_signals[LAST_SIGNAL] = { 0 }; @@ -313,10 +219,12 @@ gst_multi_socket_sink_class_init (GstMultiSocketSinkClass * klass) GObjectClass *gobject_class; GstElementClass *gstelement_class; GstBaseSinkClass *gstbasesink_class; + GstMultiHandleSinkClass *gstmultihandlesink_class; gobject_class = (GObjectClass *) klass; gstelement_class = (GstElementClass *) klass; gstbasesink_class = (GstBaseSinkClass *) klass; + gstmultihandlesink_class = (GstMultiHandleSinkClass *) klass; gobject_class->set_property = gst_multi_socket_sink_set_property; gobject_class->get_property = gst_multi_socket_sink_get_property; @@ -333,22 +241,6 @@ gst_multi_socket_sink_class_init (GstMultiSocketSinkClass * klass) G_MAXINT, DEFAULT_BUFFERS_SOFT_MAX, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - g_object_class_install_property (gobject_class, PROP_BYTES_MIN, - g_param_spec_int ("bytes-min", "Bytes min", - "min number of bytes to queue (-1 = as little as possible)", -1, - G_MAXINT, DEFAULT_BYTES_MIN, - G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - g_object_class_install_property (gobject_class, PROP_TIME_MIN, - g_param_spec_int64 ("time-min", "Time min", - "min number of time to queue (-1 = as little as possible)", -1, - G_MAXINT64, DEFAULT_TIME_MIN, - G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - g_object_class_install_property (gobject_class, PROP_BUFFERS_MIN, - g_param_spec_int ("buffers-min", "Buffers min", - "min number of buffers to queue (-1 = as few as possible)", -1, - G_MAXINT, DEFAULT_BUFFERS_MIN, - G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - g_object_class_install_property (gobject_class, PROP_UNIT_TYPE, g_param_spec_enum ("unit-type", "Units type", "The unit to measure the max/soft-max/queued properties", @@ -379,29 +271,6 @@ gst_multi_socket_sink_class_init (GstMultiSocketSinkClass * klass) G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); #endif - g_object_class_install_property (gobject_class, PROP_RECOVER_POLICY, - g_param_spec_enum ("recover-policy", "Recover Policy", - "How to recover when client reaches the soft max", - GST_TYPE_RECOVER_POLICY, DEFAULT_RECOVER_POLICY, - G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - g_object_class_install_property (gobject_class, PROP_TIMEOUT, - g_param_spec_uint64 ("timeout", "Timeout", - "Maximum inactivity timeout in nanoseconds for a client (0 = no limit)", - 0, G_MAXUINT64, DEFAULT_TIMEOUT, - G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - g_object_class_install_property (gobject_class, PROP_SYNC_METHOD, - g_param_spec_enum ("sync-method", "Sync Method", - "How to sync new clients to the stream", GST_TYPE_SYNC_METHOD, - DEFAULT_SYNC_METHOD, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - g_object_class_install_property (gobject_class, PROP_BYTES_TO_SERVE, - g_param_spec_uint64 ("bytes-to-serve", "Bytes to serve", - "Number of bytes received to serve to clients", 0, G_MAXUINT64, 0, - G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); - g_object_class_install_property (gobject_class, PROP_BYTES_SERVED, - g_param_spec_uint64 ("bytes-served", "Bytes served", - "Total number of bytes send to all clients", 0, G_MAXUINT64, 0, - G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); - g_object_class_install_property (gobject_class, PROP_BURST_FORMAT, g_param_spec_enum ("burst-format", "Burst format", "The format of the burst units (when sync-method is burst[[-with]-keyframe])", @@ -506,19 +375,6 @@ gst_multi_socket_sink_class_init (GstMultiSocketSinkClass * klass) G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstMultiSocketSinkClass, remove_flush), NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1, G_TYPE_SOCKET); - /** - * GstMultiSocketSink::clear: - * @gstmultisocketsink: the multisocketsink element to emit this signal on - * - * Remove all sockets from multisocketsink. Since multisocketsink did not - * open sockets itself, it does not explicitly close the sockets. The application - * should do so by connecting to the client-socket-removed callback. - */ - gst_multi_socket_sink_signals[SIGNAL_CLEAR] = - g_signal_new ("clear", G_TYPE_FROM_CLASS (klass), - G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, - G_STRUCT_OFFSET (GstMultiSocketSinkClass, clear), NULL, NULL, - g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0); /** * GstMultiSocketSink::get-stats: @@ -612,11 +468,13 @@ gst_multi_socket_sink_class_init (GstMultiSocketSinkClass * klass) gstbasesink_class->unlock_stop = GST_DEBUG_FUNCPTR (gst_multi_socket_sink_unlock_stop); + gstmultihandlesink_class->clear = + GST_DEBUG_FUNCPTR (gst_multi_socket_sink_clear); + klass->add = GST_DEBUG_FUNCPTR (gst_multi_socket_sink_add); klass->add_full = GST_DEBUG_FUNCPTR (gst_multi_socket_sink_add_full); klass->remove = GST_DEBUG_FUNCPTR (gst_multi_socket_sink_remove); klass->remove_flush = GST_DEBUG_FUNCPTR (gst_multi_socket_sink_remove_flush); - klass->clear = GST_DEBUG_FUNCPTR (gst_multi_socket_sink_clear); klass->get_stats = GST_DEBUG_FUNCPTR (gst_multi_socket_sink_get_stats); GST_DEBUG_CATEGORY_INIT (multisocketsink_debug, "multisocketsink", 0, @@ -626,7 +484,7 @@ gst_multi_socket_sink_class_init (GstMultiSocketSinkClass * klass) static void gst_multi_socket_sink_init (GstMultiSocketSink * this) { - GST_OBJECT_FLAG_UNSET (this, GST_MULTI_SOCKET_SINK_OPEN); + GST_OBJECT_FLAG_UNSET (this, GST_MULTI_HANDLE_SINK_OPEN); CLIENTS_LOCK_INIT (this); this->clients = NULL; @@ -636,13 +494,7 @@ gst_multi_socket_sink_init (GstMultiSocketSink * this) this->unit_type = DEFAULT_UNIT_TYPE; this->units_max = DEFAULT_UNITS_MAX; this->units_soft_max = DEFAULT_UNITS_SOFT_MAX; - this->time_min = DEFAULT_TIME_MIN; - this->bytes_min = DEFAULT_BYTES_MIN; - this->buffers_min = DEFAULT_BUFFERS_MIN; - this->recover_policy = DEFAULT_RECOVER_POLICY; - this->timeout = DEFAULT_TIMEOUT; - this->def_sync_method = DEFAULT_SYNC_METHOD; this->def_burst_format = DEFAULT_BURST_FORMAT; this->def_burst_value = DEFAULT_BURST_VALUE; @@ -762,8 +614,8 @@ gst_multi_socket_sink_add_full (GstMultiSocketSink * sink, GSocket * socket, GstFormat max_format, guint64 max_value) { GstSocketClient *client; + GstMultiHandleClient *mhclient; GList *clink; - GTimeVal now; GST_DEBUG_OBJECT (sink, "[socket %p] adding client, sync_method %d, " "min_format %d, min_value %" G_GUINT64_FORMAT @@ -778,31 +630,14 @@ gst_multi_socket_sink_add_full (GstMultiSocketSink * sink, GSocket * socket, /* create client datastructure */ client = g_new0 (GstSocketClient, 1); + mhclient = (GstMultiHandleClient *) client; + gst_multi_handle_sink_client_init (mhclient, sync_method); client->socket = G_SOCKET (g_object_ref (socket)); - client->status = GST_CLIENT_STATUS_OK; - client->bufpos = -1; - client->flushcount = -1; - client->bufoffset = 0; - client->sending = NULL; - client->bytes_sent = 0; - client->dropped_buffers = 0; - client->avg_queue_size = 0; - client->first_buffer_ts = GST_CLOCK_TIME_NONE; - client->last_buffer_ts = GST_CLOCK_TIME_NONE; - client->new_connection = TRUE; + client->burst_min_format = min_format; client->burst_min_value = min_value; client->burst_max_format = max_format; client->burst_max_value = max_value; - client->sync_method = sync_method; - client->currently_removing = FALSE; - - /* update start time */ - g_get_current_time (&now); - client->connect_time = GST_TIMEVAL_TO_TIME (now); - client->disconnect_time = 0; - /* set last activity time to connect time */ - client->last_activity_time = client->connect_time; CLIENTS_LOCK (sink); @@ -850,13 +685,13 @@ wrong_limits: } duplicate: { - client->status = GST_CLIENT_STATUS_DUPLICATE; + mhclient->status = GST_CLIENT_STATUS_DUPLICATE; CLIENTS_UNLOCK (sink); GST_WARNING_OBJECT (sink, "[socket %p] duplicate client found, refusing", socket); g_signal_emit (G_OBJECT (sink), gst_multi_socket_sink_signals[SIGNAL_CLIENT_REMOVED], 0, socket, - client->status); + mhclient->status); g_free (client); return; } @@ -866,7 +701,10 @@ duplicate: void gst_multi_socket_sink_add (GstMultiSocketSink * sink, GSocket * socket) { - gst_multi_socket_sink_add_full (sink, socket, sink->def_sync_method, + GstMultiHandleSink *mhsink; + + mhsink = GST_MULTI_HANDLE_SINK (sink); + gst_multi_socket_sink_add_full (sink, socket, mhsink->def_sync_method, sink->def_burst_format, sink->def_burst_value, sink->def_burst_format, -1); } @@ -883,15 +721,16 @@ gst_multi_socket_sink_remove (GstMultiSocketSink * sink, GSocket * socket) clink = g_hash_table_lookup (sink->socket_hash, socket); if (clink != NULL) { GstSocketClient *client = clink->data; + GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client; - if (client->status != GST_CLIENT_STATUS_OK) { + if (mhclient->status != GST_CLIENT_STATUS_OK) { GST_INFO_OBJECT (sink, "[socket %p] Client already disconnecting with status %d", - socket, client->status); + socket, mhclient->status); goto done; } - client->status = GST_CLIENT_STATUS_REMOVED; + mhclient->status = GST_CLIENT_STATUS_REMOVED; gst_multi_socket_sink_remove_client_link (sink, clink); } else { GST_WARNING_OBJECT (sink, "[socket %p] no client with this socket found!", @@ -914,21 +753,22 @@ gst_multi_socket_sink_remove_flush (GstMultiSocketSink * sink, GSocket * socket) clink = g_hash_table_lookup (sink->socket_hash, socket); if (clink != NULL) { GstSocketClient *client = clink->data; + GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client; - if (client->status != GST_CLIENT_STATUS_OK) { + if (mhclient->status != GST_CLIENT_STATUS_OK) { GST_INFO_OBJECT (sink, "[socket %p] Client already disconnecting with status %d", - socket, client->status); + socket, mhclient->status); goto done; } /* take the position of the client as the number of buffers left to flush. * If the client was at position -1, we flush 0 buffers, 0 == flush 1 * buffer, etc... */ - client->flushcount = client->bufpos + 1; + mhclient->flushcount = mhclient->bufpos + 1; /* mark client as flushing. We can not remove the client right away because * it might have some buffers to flush in the ->sending queue. */ - client->status = GST_CLIENT_STATUS_FLUSHING; + mhclient->status = GST_CLIENT_STATUS_FLUSHING; } else { GST_WARNING_OBJECT (sink, "[socket %p] no client with this fd found!", socket); @@ -940,10 +780,11 @@ done: /* can be called both through the signal (i.e. from any thread) or when * stopping, after the writing thread has shut down */ void -gst_multi_socket_sink_clear (GstMultiSocketSink * sink) +gst_multi_socket_sink_clear (GstMultiHandleSink * mhsink) { GList *clients; guint32 cookie; + GstMultiSocketSink *sink = GST_MULTI_SOCKET_SINK (mhsink); GST_DEBUG_OBJECT (sink, "clearing all clients"); @@ -951,15 +792,15 @@ gst_multi_socket_sink_clear (GstMultiSocketSink * sink) restart: cookie = sink->clients_cookie; for (clients = sink->clients; clients; clients = clients->next) { - GstSocketClient *client; + GstMultiHandleClient *mhclient; if (cookie != sink->clients_cookie) { GST_DEBUG_OBJECT (sink, "cookie changed while removing all clients"); goto restart; } - client = clients->data; - client->status = GST_CLIENT_STATUS_REMOVED; + mhclient = (GstMultiHandleClient *) clients->data; + mhclient->status = GST_CLIENT_STATUS_REMOVED; gst_multi_socket_sink_remove_client_link (sink, clients); } @@ -982,29 +823,30 @@ gst_multi_socket_sink_get_stats (GstMultiSocketSink * sink, GSocket * socket) client = clink->data; if (client != NULL) { + GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client; guint64 interval; result = gst_structure_new_empty ("multisocketsink-stats"); - if (client->disconnect_time == 0) { + if (mhclient->disconnect_time == 0) { GTimeVal nowtv; g_get_current_time (&nowtv); - interval = GST_TIMEVAL_TO_TIME (nowtv) - client->connect_time; + interval = GST_TIMEVAL_TO_TIME (nowtv) - mhclient->connect_time; } else { - interval = client->disconnect_time - client->connect_time; + interval = mhclient->disconnect_time - mhclient->connect_time; } gst_structure_set (result, - "bytes-sent", G_TYPE_UINT64, client->bytes_sent, - "connect-time", G_TYPE_UINT64, client->connect_time, - "disconnect-time", G_TYPE_UINT64, client->disconnect_time, + "bytes-sent", G_TYPE_UINT64, mhclient->bytes_sent, + "connect-time", G_TYPE_UINT64, mhclient->connect_time, + "disconnect-time", G_TYPE_UINT64, mhclient->disconnect_time, "connected-duration", G_TYPE_UINT64, interval, - "last-activatity-time", G_TYPE_UINT64, client->last_activity_time, - "dropped-buffers", G_TYPE_UINT64, client->dropped_buffers, - "first-buffer-ts", G_TYPE_UINT64, client->first_buffer_ts, - "last-buffer-ts", G_TYPE_UINT64, client->last_buffer_ts, NULL); + "last-activatity-time", G_TYPE_UINT64, mhclient->last_activity_time, + "dropped-buffers", G_TYPE_UINT64, mhclient->dropped_buffers, + "first-buffer-ts", G_TYPE_UINT64, mhclient->first_buffer_ts, + "last-buffer-ts", G_TYPE_UINT64, mhclient->last_buffer_ts, NULL); } noclient: @@ -1031,22 +873,23 @@ gst_multi_socket_sink_remove_client_link (GstMultiSocketSink * sink, GSocket *socket; GTimeVal now; GstSocketClient *client = link->data; + GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client; GstMultiSocketSinkClass *fclass; fclass = GST_MULTI_SOCKET_SINK_GET_CLASS (sink); socket = client->socket; - if (client->currently_removing) { + if (mhclient->currently_removing) { GST_WARNING_OBJECT (sink, "[socket %p] client is already being removed", socket); return; } else { - client->currently_removing = TRUE; + mhclient->currently_removing = TRUE; } /* FIXME: if we keep track of ip we can log it here and signal */ - switch (client->status) { + switch (mhclient->status) { case GST_CLIENT_STATUS_OK: GST_WARNING_OBJECT (sink, "[socket %p] removing client %p for no reason", socket, client); @@ -1073,7 +916,7 @@ gst_multi_socket_sink_remove_client_link (GstMultiSocketSink * sink, default: GST_WARNING_OBJECT (sink, "[socket %p] removing client %p with invalid reason %d", socket, - client, client->status); + client, mhclient->status); break; } @@ -1084,16 +927,16 @@ gst_multi_socket_sink_remove_client_link (GstMultiSocketSink * sink, } g_get_current_time (&now); - client->disconnect_time = GST_TIMEVAL_TO_TIME (now); + mhclient->disconnect_time = GST_TIMEVAL_TO_TIME (now); /* free client buffers */ - g_slist_foreach (client->sending, (GFunc) gst_mini_object_unref, NULL); - g_slist_free (client->sending); - client->sending = NULL; + g_slist_foreach (mhclient->sending, (GFunc) gst_mini_object_unref, NULL); + g_slist_free (mhclient->sending); + mhclient->sending = NULL; - if (client->caps) - gst_caps_unref (client->caps); - client->caps = NULL; + if (mhclient->caps) + gst_caps_unref (mhclient->caps); + mhclient->caps = NULL; /* unlock the mutex before signaling because the signal handler * might query some properties */ @@ -1101,7 +944,7 @@ gst_multi_socket_sink_remove_client_link (GstMultiSocketSink * sink, g_signal_emit (G_OBJECT (sink), gst_multi_socket_sink_signals[SIGNAL_CLIENT_REMOVED], 0, socket, - client->status); + mhclient->status); /* lock again before we remove the client completely */ CLIENTS_LOCK (sink); @@ -1146,6 +989,7 @@ gst_multi_socket_sink_handle_client_read (GstMultiSocketSink * sink, gssize nread; GError *err = NULL; gboolean first = TRUE; + GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client; GST_DEBUG_OBJECT (sink, "[socket %p] select reports client read", client->socket); @@ -1172,12 +1016,12 @@ gst_multi_socket_sink_handle_client_read (GstMultiSocketSink * sink, /* client sent close, so remove it */ GST_DEBUG_OBJECT (sink, "[socket %p] client asked for close, removing", client->socket); - client->status = GST_CLIENT_STATUS_CLOSED; + mhclient->status = GST_CLIENT_STATUS_CLOSED; ret = FALSE; } else if (nread < 0) { GST_WARNING_OBJECT (sink, "[socket %p] could not read: %s", client->socket, err->message); - client->status = GST_CLIENT_STATUS_ERROR; + mhclient->status = GST_CLIENT_STATUS_ERROR; ret = FALSE; break; } @@ -1205,6 +1049,7 @@ static gboolean gst_multi_socket_sink_client_queue_buffer (GstMultiSocketSink * sink, GstSocketClient * client, GstBuffer * buffer) { + GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client; GstCaps *caps; /* TRUE: send them if the new caps have them */ @@ -1215,15 +1060,15 @@ gst_multi_socket_sink_client_queue_buffer (GstMultiSocketSink * sink, * buffers (because it's a new client, or because they changed) */ caps = gst_pad_get_current_caps (GST_BASE_SINK_PAD (sink)); - if (!client->caps) { + if (!mhclient->caps) { GST_DEBUG_OBJECT (sink, "[socket %p] no previous caps for this client, send streamheader", client->socket); send_streamheader = TRUE; - client->caps = gst_caps_ref (caps); + mhclient->caps = gst_caps_ref (caps); } else { /* there were previous caps recorded, so compare */ - if (!gst_caps_is_equal (caps, client->caps)) { + if (!gst_caps_is_equal (caps, mhclient->caps)) { const GValue *sh1, *sh2; /* caps are not equal, but could still have the same streamheader */ @@ -1235,7 +1080,7 @@ gst_multi_socket_sink_client_queue_buffer (GstMultiSocketSink * sink, client->socket); } else { /* there is a new streamheader */ - s = gst_caps_get_structure (client->caps, 0); + s = gst_caps_get_structure (mhclient->caps, 0); if (!gst_structure_has_field (s, "streamheader")) { /* no previous streamheader, so send the new one */ GST_DEBUG_OBJECT (sink, @@ -1264,8 +1109,8 @@ gst_multi_socket_sink_client_queue_buffer (GstMultiSocketSink * sink, } } /* Replace the old caps */ - gst_caps_unref (client->caps); - client->caps = gst_caps_ref (caps); + gst_caps_unref (mhclient->caps); + mhclient->caps = gst_caps_ref (caps); } if (G_UNLIKELY (send_streamheader)) { @@ -1301,7 +1146,7 @@ gst_multi_socket_sink_client_queue_buffer (GstMultiSocketSink * sink, G_GSIZE_FORMAT, client->socket, gst_buffer_get_size (buffer)); gst_buffer_ref (buffer); - client->sending = g_slist_append (client->sending, buffer); + mhclient->sending = g_slist_append (mhclient->sending, buffer); } } } @@ -1314,7 +1159,7 @@ gst_multi_socket_sink_client_queue_buffer (GstMultiSocketSink * sink, gst_buffer_get_size (buffer)); gst_buffer_ref (buffer); - client->sending = g_slist_append (client->sending, buffer); + mhclient->sending = g_slist_append (mhclient->sending, buffer); return TRUE; } @@ -1591,29 +1436,30 @@ gst_multi_socket_sink_new_client (GstMultiSocketSink * sink, GstSocketClient * client) { gint result; + GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client; GST_DEBUG_OBJECT (sink, "[socket %p] new client, deciding where to start in queue", client->socket); GST_DEBUG_OBJECT (sink, "queue is currently %d buffers long", sink->bufqueue->len); - switch (client->sync_method) { + switch (mhclient->sync_method) { case GST_SYNC_METHOD_LATEST: /* no syncing, we are happy with whatever the client is going to get */ - result = client->bufpos; + result = mhclient->bufpos; GST_DEBUG_OBJECT (sink, "[socket %p] SYNC_METHOD_LATEST, position %d", client->socket, result); break; case GST_SYNC_METHOD_NEXT_KEYFRAME: { - /* if one of the new buffers (between client->bufpos and 0) in the queue + /* if one of the new buffers (between mhclient->bufpos and 0) in the queue * is a sync point, we can proceed, otherwise we need to keep waiting */ GST_LOG_OBJECT (sink, "[socket %p] new client, bufpos %d, waiting for keyframe", - client->socket, client->bufpos); + client->socket, mhclient->bufpos); - result = find_prev_syncframe (sink, client->bufpos); + result = find_prev_syncframe (sink, mhclient->bufpos); if (result != -1) { GST_DEBUG_OBJECT (sink, "[socket %p] SYNC_METHOD_NEXT_KEYFRAME: result %d", @@ -1626,7 +1472,7 @@ gst_multi_socket_sink_new_client (GstMultiSocketSink * sink, GST_LOG_OBJECT (sink, "[socket %p] new client, skipping buffer(s), no syncpoint found", client->socket); - client->bufpos = -1; + mhclient->bufpos = -1; break; } case GST_SYNC_METHOD_LATEST_KEYFRAME: @@ -1651,9 +1497,9 @@ gst_multi_socket_sink_new_client (GstMultiSocketSink * sink, "[socket %p] SYNC_METHOD_LATEST_KEYFRAME: no keyframe found, " "switching to SYNC_METHOD_NEXT_KEYFRAME", client->socket); /* throw client to the waiting state */ - client->bufpos = -1; + mhclient->bufpos = -1; /* and make client sync to next keyframe */ - client->sync_method = GST_SYNC_METHOD_NEXT_KEYFRAME; + mhclient->sync_method = GST_SYNC_METHOD_NEXT_KEYFRAME; break; } case GST_SYNC_METHOD_BURST: @@ -1726,9 +1572,9 @@ gst_multi_socket_sink_new_client (GstMultiSocketSink * sink, "no prev keyframe found in BURST_KEYFRAME sync mode, waiting for next"); /* throw client to the waiting state */ - client->bufpos = -1; + mhclient->bufpos = -1; /* and make client sync to next keyframe */ - client->sync_method = GST_SYNC_METHOD_NEXT_KEYFRAME; + mhclient->sync_method = GST_SYNC_METHOD_NEXT_KEYFRAME; result = -1; break; } @@ -1772,8 +1618,8 @@ gst_multi_socket_sink_new_client (GstMultiSocketSink * sink, break; } default: - g_warning ("unknown sync method %d", client->sync_method); - result = client->bufpos; + g_warning ("unknown sync method %d", mhclient->sync_method); + result = mhclient->bufpos; break; } return result; @@ -1788,13 +1634,13 @@ gst_multi_socket_sink_new_client (GstMultiSocketSink * sink, * We first check to see if we need to send streamheaders. If so, we queue them. * * Then we run into the main loop that tries to send as many buffers as - * possible. It will first exhaust the client->sending queue and if the queue + * possible. It will first exhaust the mhclient->sending queue and if the queue * is empty, it will pick a buffer from the global queue. * - * Sending the buffers from the client->sending queue is basically writing + * Sending the buffers from the mhclient->sending queue is basically writing * the bytes to the socket and maintaining a count of the bytes that were * sent. When the buffer is completely sent, it is removed from the - * client->sending queue and we try to pick a new buffer for sending. + * mhclient->sending queue and we try to pick a new buffer for sending. * * When the sending returns a partial buffer we stop sending more data as * the next send operation could block. @@ -1811,19 +1657,23 @@ gst_multi_socket_sink_handle_client_write (GstMultiSocketSink * sink, GstClockTime now; GTimeVal nowtv; GError *err = NULL; + GstMultiHandleSink *mhsink; + GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client; + + mhsink = GST_MULTI_HANDLE_SINK (sink); g_get_current_time (&nowtv); now = GST_TIMEVAL_TO_TIME (nowtv); - flushing = client->status == GST_CLIENT_STATUS_FLUSHING; + flushing = mhclient->status == GST_CLIENT_STATUS_FLUSHING; more = TRUE; do { gint maxsize; - if (!client->sending) { + if (!mhclient->sending) { /* client is not working on a buffer */ - if (client->bufpos == -1) { + if (mhclient->bufpos == -1) { /* client is too fast, remove from write queue until new buffer is * available */ if (client->source) { @@ -1832,7 +1682,7 @@ gst_multi_socket_sink_handle_client_write (GstMultiSocketSink * sink, client->source = NULL; } /* if we flushed out all of the client buffers, we can stop */ - if (client->flushcount == 0) + if (mhclient->flushcount == 0) goto flushed; return TRUE; @@ -1843,13 +1693,13 @@ gst_multi_socket_sink_handle_client_write (GstMultiSocketSink * sink, /* for new connections, we need to find a good spot in the * bufqueue to start streaming from */ - if (client->new_connection && !flushing) { + if (mhclient->new_connection && !flushing) { gint position = gst_multi_socket_sink_new_client (sink, client); if (position >= 0) { /* we got a valid spot in the queue */ - client->new_connection = FALSE; - client->bufpos = position; + mhclient->new_connection = FALSE; + mhclient->bufpos = position; } else { /* cannot send data to this client yet */ if (client->source) { @@ -1862,51 +1712,51 @@ gst_multi_socket_sink_handle_client_write (GstMultiSocketSink * sink, } /* we flushed all remaining buffers, no need to get a new one */ - if (client->flushcount == 0) + if (mhclient->flushcount == 0) goto flushed; /* grab buffer */ - buf = g_array_index (sink->bufqueue, GstBuffer *, client->bufpos); - client->bufpos--; + buf = g_array_index (sink->bufqueue, GstBuffer *, mhclient->bufpos); + mhclient->bufpos--; /* update stats */ timestamp = GST_BUFFER_TIMESTAMP (buf); - if (client->first_buffer_ts == GST_CLOCK_TIME_NONE) - client->first_buffer_ts = timestamp; + if (mhclient->first_buffer_ts == GST_CLOCK_TIME_NONE) + mhclient->first_buffer_ts = timestamp; if (timestamp != -1) - client->last_buffer_ts = timestamp; + mhclient->last_buffer_ts = timestamp; /* decrease flushcount */ - if (client->flushcount != -1) - client->flushcount--; + if (mhclient->flushcount != -1) + mhclient->flushcount--; GST_LOG_OBJECT (sink, "[socket %p] client %p at position %d", - socket, client, client->bufpos); + socket, client, mhclient->bufpos); /* queueing a buffer will ref it */ gst_multi_socket_sink_client_queue_buffer (sink, client, buf); /* need to start from the first byte for this new buffer */ - client->bufoffset = 0; + mhclient->bufoffset = 0; } } /* see if we need to send something */ - if (client->sending) { + if (mhclient->sending) { gssize wrote; GstBuffer *head; GstMapInfo map; /* pick first buffer from list */ - head = GST_BUFFER (client->sending->data); + head = GST_BUFFER (mhclient->sending->data); gst_buffer_map (head, &map, GST_MAP_READ); - maxsize = map.size - client->bufoffset; + maxsize = map.size - mhclient->bufoffset; /* try to write the complete buffer */ wrote = - g_socket_send (socket, (gchar *) map.data + client->bufoffset, + g_socket_send (socket, (gchar *) map.data + mhclient->bufoffset, maxsize, sink->cancellable, &err); gst_buffer_unmap (head, &map); @@ -1924,19 +1774,19 @@ gst_multi_socket_sink_handle_client_write (GstMultiSocketSink * sink, GST_LOG_OBJECT (sink, "partial write on %p of %" G_GSSIZE_FORMAT " bytes", socket, wrote); - client->bufoffset += wrote; + mhclient->bufoffset += wrote; more = FALSE; } else { /* complete buffer was written, we can proceed to the next one */ - client->sending = g_slist_remove (client->sending, head); + mhclient->sending = g_slist_remove (mhclient->sending, head); gst_buffer_unref (head); /* make sure we start from byte 0 for the next buffer */ - client->bufoffset = 0; + mhclient->bufoffset = 0; } /* update stats */ - client->bytes_sent += wrote; - client->last_activity_time = now; - sink->bytes_served += wrote; + mhclient->bytes_sent += wrote; + mhclient->last_activity_time = now; + mhsink->bytes_served += wrote; } } } while (more); @@ -1947,14 +1797,14 @@ gst_multi_socket_sink_handle_client_write (GstMultiSocketSink * sink, flushed: { GST_DEBUG_OBJECT (sink, "[socket %p] flushed, removing", socket); - client->status = GST_CLIENT_STATUS_REMOVED; + mhclient->status = GST_CLIENT_STATUS_REMOVED; return FALSE; } connection_reset: { GST_DEBUG_OBJECT (sink, "[socket %p] connection reset by peer, removing", socket); - client->status = GST_CLIENT_STATUS_CLOSED; + mhclient->status = GST_CLIENT_STATUS_CLOSED; g_clear_error (&err); return FALSE; } @@ -1964,7 +1814,7 @@ write_error: "[socket %p] could not write, removing client: %s", socket, err->message); g_clear_error (&err); - client->status = GST_CLIENT_STATUS_ERROR; + mhclient->status = GST_CLIENT_STATUS_ERROR; return FALSE; } } @@ -1978,16 +1828,18 @@ gst_multi_socket_sink_recover_client (GstMultiSocketSink * sink, GstSocketClient * client) { gint newbufpos; + GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink); + GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client; GST_WARNING_OBJECT (sink, "[socket %p] client %p is lagging at %d, recover using policy %d", - client->socket, client, client->bufpos, sink->recover_policy); + client->socket, client, mhclient->bufpos, mhsink->recover_policy); - switch (sink->recover_policy) { + switch (mhsink->recover_policy) { case GST_RECOVER_POLICY_NONE: /* do nothing, client will catch up or get kicked out when it reaches * the hard max */ - newbufpos = client->bufpos; + newbufpos = mhclient->bufpos; break; case GST_RECOVER_POLICY_RESYNC_LATEST: /* move to beginning of queue */ @@ -2028,7 +1880,7 @@ gst_multi_socket_sink_recover_client (GstMultiSocketSink * sink, * tail buffer if the max queue size is exceeded, unreffing the queued buffer. * Note that unreffing the buffer is not a problem as clients who * started writing out this buffer will still have a reference to it in the - * client->sending queue. + * mhclient->sending queue. * * After adding the buffer, we update all client positions in the queue. If * a client moves over the soft max, we start the recovery procedure for this @@ -2051,6 +1903,9 @@ gst_multi_socket_sink_queue_buffer (GstMultiSocketSink * sink, GstBuffer * buf) GstClockTime now; gint max_buffers, soft_max_buffers; guint cookie; + GstMultiHandleSink *mhsink; + + mhsink = GST_MULTI_HANDLE_SINK (sink); g_get_current_time (&nowtv); now = GST_TIMEVAL_TO_TIME (nowtv); @@ -2080,6 +1935,7 @@ restart: cookie = sink->clients_cookie; for (clients = sink->clients; clients; clients = next) { GstSocketClient *client; + GstMultiHandleClient *mhclient; if (cookie != sink->clients_cookie) { GST_DEBUG_OBJECT (sink, "Clients cookie outdated, restarting"); @@ -2087,22 +1943,23 @@ restart: } client = clients->data; + mhclient = (GstMultiHandleClient *) client; next = g_list_next (clients); - client->bufpos++; + mhclient->bufpos++; GST_LOG_OBJECT (sink, "[socket %p] client %p at position %d", - client->socket, client, client->bufpos); + client->socket, client, mhclient->bufpos); /* check soft max if needed, recover client */ - if (soft_max_buffers > 0 && client->bufpos >= soft_max_buffers) { + if (soft_max_buffers > 0 && mhclient->bufpos >= soft_max_buffers) { gint newpos; newpos = gst_multi_socket_sink_recover_client (sink, client); - if (newpos != client->bufpos) { - client->dropped_buffers += client->bufpos - newpos; - client->bufpos = newpos; - client->discont = TRUE; + if (newpos != mhclient->bufpos) { + mhclient->dropped_buffers += mhclient->bufpos - newpos; + mhclient->bufpos = newpos; + mhclient->discont = TRUE; GST_INFO_OBJECT (sink, "[socket %p] client %p position reset to %d", - client->socket, client, client->bufpos); + client->socket, client, mhclient->bufpos); } else { GST_INFO_OBJECT (sink, "[socket %p] client %p not recovering position", @@ -2110,20 +1967,20 @@ restart: } } /* check hard max and timeout, remove client */ - if ((max_buffers > 0 && client->bufpos >= max_buffers) || - (sink->timeout > 0 - && now - client->last_activity_time > sink->timeout)) { + if ((max_buffers > 0 && mhclient->bufpos >= max_buffers) || + (mhsink->timeout > 0 + && now - mhclient->last_activity_time > mhsink->timeout)) { /* remove client */ GST_WARNING_OBJECT (sink, "[socket %p] client %p is too slow, removing", client->socket, client); /* remove the client, the fd set will be cleared and the select thread * will be signaled */ - client->status = GST_CLIENT_STATUS_SLOW; + mhclient->status = GST_CLIENT_STATUS_SLOW; /* set client to invalid position while being removed */ - client->bufpos = -1; + mhclient->bufpos = -1; gst_multi_socket_sink_remove_client_link (sink, clients); continue; - } else if (client->bufpos == 0 || client->new_connection) { + } else if (mhclient->bufpos == 0 || mhclient->new_connection) { /* can send data to this client now. need to signal the select thread that * the fd_set changed */ if (!client->source) { @@ -2138,8 +1995,8 @@ restart: } } /* keep track of maximum buffer usage */ - if (client->bufpos > max_buffer_usage) { - max_buffer_usage = client->bufpos; + if (mhclient->bufpos > max_buffer_usage) { + max_buffer_usage = mhclient->bufpos; } } @@ -2150,13 +2007,14 @@ restart: GST_LOG_OBJECT (sink, "extending queue %d to respect time_min %" GST_TIME_FORMAT ", bytes_min %d, buffers_min %d", max_buffer_usage, - GST_TIME_ARGS (sink->time_min), sink->bytes_min, sink->buffers_min); + GST_TIME_ARGS (mhsink->time_min), mhsink->bytes_min, + mhsink->buffers_min); /* get index where the limits are ok, we don't really care if all limits * are ok, we just queue as much as we need. We also don't compare against * the max limits. */ - find_limits (sink, &usage, sink->bytes_min, sink->buffers_min, - sink->time_min, &max, -1, -1, -1); + find_limits (sink, &usage, mhsink->bytes_min, mhsink->buffers_min, + mhsink->time_min, &max, -1, -1, -1); max_buffer_usage = MAX (max_buffer_usage, usage + 1); GST_LOG_OBJECT (sink, "extended queue to %d", max_buffer_usage); @@ -2165,8 +2023,8 @@ restart: /* now look for sync points and make sure there is at least one * sync point in the queue. We only do this if the LATEST_KEYFRAME or * BURST_KEYFRAME mode is selected */ - if (sink->def_sync_method == GST_SYNC_METHOD_LATEST_KEYFRAME || - sink->def_sync_method == GST_SYNC_METHOD_BURST_KEYFRAME) { + if (mhsink->def_sync_method == GST_SYNC_METHOD_LATEST_KEYFRAME || + mhsink->def_sync_method == GST_SYNC_METHOD_BURST_KEYFRAME) { /* no point in searching beyond the queue length */ gint limit = queuelen; GstBuffer *buf; @@ -2222,6 +2080,7 @@ gst_multi_socket_sink_socket_condition (GSocket * socket, GList *clink; GstSocketClient *client; gboolean ret = TRUE; + GstMultiHandleClient *mhclient; CLIENTS_LOCK (sink); clink = g_hash_table_lookup (sink->socket_hash, socket); @@ -2231,9 +2090,10 @@ gst_multi_socket_sink_socket_condition (GSocket * socket, } client = clink->data; + mhclient = (GstMultiHandleClient *) client; - if (client->status != GST_CLIENT_STATUS_FLUSHING - && client->status != GST_CLIENT_STATUS_OK) { + if (mhclient->status != GST_CLIENT_STATUS_FLUSHING + && mhclient->status != GST_CLIENT_STATUS_OK) { gst_multi_socket_sink_remove_client_link (sink, clink); ret = FALSE; goto done; @@ -2241,12 +2101,12 @@ gst_multi_socket_sink_socket_condition (GSocket * socket, if ((condition & G_IO_ERR)) { GST_WARNING_OBJECT (sink, "Socket %p has error", client->socket); - client->status = GST_CLIENT_STATUS_ERROR; + mhclient->status = GST_CLIENT_STATUS_ERROR; gst_multi_socket_sink_remove_client_link (sink, clink); ret = FALSE; goto done; } else if ((condition & G_IO_HUP)) { - client->status = GST_CLIENT_STATUS_CLOSED; + mhclient->status = GST_CLIENT_STATUS_CLOSED; gst_multi_socket_sink_remove_client_link (sink, clink); ret = FALSE; goto done; @@ -2278,6 +2138,9 @@ gst_multi_socket_sink_timeout (GstMultiSocketSink * sink) GstClockTime now; GTimeVal nowtv; GList *clients; + GstMultiHandleSink *mhsink; + + mhsink = GST_MULTI_HANDLE_SINK (sink); g_get_current_time (&nowtv); now = GST_TIMEVAL_TO_TIME (nowtv); @@ -2285,10 +2148,13 @@ gst_multi_socket_sink_timeout (GstMultiSocketSink * sink) CLIENTS_LOCK (sink); for (clients = sink->clients; clients; clients = clients->next) { GstSocketClient *client; + GstMultiHandleClient *mhclient; client = clients->data; - if (sink->timeout > 0 && now - client->last_activity_time > sink->timeout) { - client->status = GST_CLIENT_STATUS_SLOW; + mhclient = (GstMultiHandleClient *) client; + if (mhsink->timeout > 0 + && now - mhclient->last_activity_time > mhsink->timeout) { + mhclient->status = GST_CLIENT_STATUS_SLOW; gst_multi_socket_sink_remove_client_link (sink, clients); } } @@ -2303,10 +2169,13 @@ static gpointer gst_multi_socket_sink_thread (GstMultiSocketSink * sink) { GSource *timeout = NULL; + GstMultiHandleSink *mhsink; + + mhsink = GST_MULTI_HANDLE_SINK (sink); while (sink->running) { - if (sink->timeout > 0) { - timeout = g_timeout_source_new (sink->timeout / GST_MSECOND); + if (mhsink->timeout > 0) { + timeout = g_timeout_source_new (mhsink->timeout / GST_MSECOND); g_source_set_callback (timeout, (GSourceFunc) gst_multi_socket_sink_timeout, gst_object_ref (sink), @@ -2337,8 +2206,10 @@ gst_multi_socket_sink_render (GstBaseSink * bsink, GstBuffer * buf) #if 0 GstCaps *bufcaps, *padcaps; #endif + GstMultiHandleSink *mhsink; sink = GST_MULTI_SOCKET_SINK (bsink); + mhsink = GST_MULTI_HANDLE_SINK (sink); g_return_val_if_fail (GST_OBJECT_FLAG_IS_SET (sink, GST_MULTI_SOCKET_SINK_OPEN), GST_FLOW_FLUSHING); @@ -2421,7 +2292,7 @@ gst_multi_socket_sink_render (GstBaseSink * bsink, GstBuffer * buf) /* queue the buffer, this is a regular data buffer. */ gst_multi_socket_sink_queue_buffer (sink, buf); - sink->bytes_to_serve += gst_buffer_get_size (buf); + mhsink->bytes_to_serve += gst_buffer_get_size (buf); } return GST_FLOW_OK; @@ -2451,15 +2322,6 @@ gst_multi_socket_sink_set_property (GObject * object, guint prop_id, case PROP_BUFFERS_SOFT_MAX: multisocketsink->units_soft_max = g_value_get_int (value); break; - case PROP_TIME_MIN: - multisocketsink->time_min = g_value_get_int64 (value); - break; - case PROP_BYTES_MIN: - multisocketsink->bytes_min = g_value_get_int (value); - break; - case PROP_BUFFERS_MIN: - multisocketsink->buffers_min = g_value_get_int (value); - break; case PROP_UNIT_TYPE: multisocketsink->unit_type = g_value_get_enum (value); break; @@ -2469,15 +2331,6 @@ gst_multi_socket_sink_set_property (GObject * object, guint prop_id, case PROP_UNITS_SOFT_MAX: multisocketsink->units_soft_max = g_value_get_int64 (value); break; - case PROP_RECOVER_POLICY: - multisocketsink->recover_policy = g_value_get_enum (value); - break; - case PROP_TIMEOUT: - multisocketsink->timeout = g_value_get_uint64 (value); - break; - case PROP_SYNC_METHOD: - multisocketsink->def_sync_method = g_value_get_enum (value); - break; case PROP_BURST_FORMAT: multisocketsink->def_burst_format = g_value_get_enum (value); break; @@ -2516,15 +2369,6 @@ gst_multi_socket_sink_get_property (GObject * object, guint prop_id, case PROP_BUFFERS_SOFT_MAX: g_value_set_int (value, multisocketsink->units_soft_max); break; - case PROP_TIME_MIN: - g_value_set_int64 (value, multisocketsink->time_min); - break; - case PROP_BYTES_MIN: - g_value_set_int (value, multisocketsink->bytes_min); - break; - case PROP_BUFFERS_MIN: - g_value_set_int (value, multisocketsink->buffers_min); - break; case PROP_BUFFERS_QUEUED: g_value_set_uint (value, multisocketsink->buffers_queued); break; @@ -2543,21 +2387,6 @@ gst_multi_socket_sink_get_property (GObject * object, guint prop_id, case PROP_UNITS_SOFT_MAX: g_value_set_int64 (value, multisocketsink->units_soft_max); break; - case PROP_RECOVER_POLICY: - g_value_set_enum (value, multisocketsink->recover_policy); - break; - case PROP_TIMEOUT: - g_value_set_uint64 (value, multisocketsink->timeout); - break; - case PROP_SYNC_METHOD: - g_value_set_enum (value, multisocketsink->def_sync_method); - break; - case PROP_BYTES_TO_SERVE: - g_value_set_uint64 (value, multisocketsink->bytes_to_serve); - break; - case PROP_BYTES_SERVED: - g_value_set_uint64 (value, multisocketsink->bytes_served); - break; case PROP_BURST_FORMAT: g_value_set_enum (value, multisocketsink->def_burst_format); break; @@ -2591,12 +2420,15 @@ gst_multi_socket_sink_start (GstBaseSink * bsink) { GstMultiSocketSinkClass *fclass; GstMultiSocketSink *this; + GstMultiHandleSink *mhsink; + GList *clients; - if (GST_OBJECT_FLAG_IS_SET (bsink, GST_MULTI_SOCKET_SINK_OPEN)) + if (GST_OBJECT_FLAG_IS_SET (bsink, GST_MULTI_HANDLE_SINK_OPEN)) return TRUE; this = GST_MULTI_SOCKET_SINK (bsink); + mhsink = GST_MULTI_HANDLE_SINK (bsink); fclass = GST_MULTI_SOCKET_SINK_GET_CLASS (this); GST_INFO_OBJECT (this, "starting"); @@ -2621,8 +2453,8 @@ gst_multi_socket_sink_start (GstBaseSink * bsink) CLIENTS_UNLOCK (this); this->streamheader = NULL; - this->bytes_to_serve = 0; - this->bytes_served = 0; + mhsink->bytes_to_serve = 0; + mhsink->bytes_served = 0; if (fclass->init) { fclass->init (this); @@ -2633,7 +2465,7 @@ gst_multi_socket_sink_start (GstBaseSink * bsink) this->thread = g_thread_new ("multisocketsink", (GThreadFunc) gst_multi_socket_sink_thread, this); - GST_OBJECT_FLAG_SET (this, GST_MULTI_SOCKET_SINK_OPEN); + GST_OBJECT_FLAG_SET (this, GST_MULTI_HANDLE_SINK_OPEN); return TRUE; } @@ -2648,14 +2480,16 @@ static gboolean gst_multi_socket_sink_stop (GstBaseSink * bsink) { GstMultiSocketSinkClass *fclass; + GstMultiHandleSinkClass *mhclass; GstMultiSocketSink *this; GstBuffer *buf; gint i; this = GST_MULTI_SOCKET_SINK (bsink); fclass = GST_MULTI_SOCKET_SINK_GET_CLASS (this); + mhclass = GST_MULTI_HANDLE_SINK_GET_CLASS (this); - if (!GST_OBJECT_FLAG_IS_SET (bsink, GST_MULTI_SOCKET_SINK_OPEN)) + if (!GST_OBJECT_FLAG_IS_SET (bsink, GST_MULTI_HANDLE_SINK_OPEN)) return TRUE; this->running = FALSE; @@ -2671,7 +2505,7 @@ gst_multi_socket_sink_stop (GstBaseSink * bsink) } /* free the clients */ - gst_multi_socket_sink_clear (this); + mhclass->clear (GST_MULTI_HANDLE_SINK (this)); if (this->streamheader) { g_slist_foreach (this->streamheader, (GFunc) gst_mini_object_unref, NULL); @@ -2703,7 +2537,7 @@ gst_multi_socket_sink_stop (GstBaseSink * bsink) } /* freeing the array is done in _finalize */ } - GST_OBJECT_FLAG_UNSET (this, GST_MULTI_SOCKET_SINK_OPEN); + GST_OBJECT_FLAG_UNSET (this, GST_MULTI_HANDLE_SINK_OPEN); return TRUE; } diff --git a/gst/tcp/gstmultisocketsink.h b/gst/tcp/gstmultisocketsink.h index a447862be7..286605befb 100644 --- a/gst/tcp/gstmultisocketsink.h +++ b/gst/tcp/gstmultisocketsink.h @@ -50,70 +50,31 @@ G_BEGIN_DECLS typedef struct _GstMultiSocketSink GstMultiSocketSink; typedef struct _GstMultiSocketSinkClass GstMultiSocketSinkClass; -typedef enum { - GST_MULTI_SOCKET_SINK_OPEN = (GST_ELEMENT_FLAG_LAST << 0), - - GST_MULTI_SOCKET_SINK_FLAG_LAST = (GST_ELEMENT_FLAG_LAST << 2) -} GstMultiSocketSinkFlags; - /* structure for a client */ typedef struct { + GstMultiHandleClient client; + GSocket *socket; GSource *source; - gint bufpos; /* position of this client in the global queue */ - gint flushcount; /* the remaining number of buffers to flush out or -1 if the - client is not flushing. */ - - GstClientStatus status; - - GSList *sending; /* the buffers we need to send */ - gint bufoffset; /* offset in the first buffer */ - - gboolean discont; - - gboolean new_connection; - - gboolean currently_removing; - /* method to sync client when connecting */ GstSyncMethod sync_method; GstFormat burst_min_format; guint64 burst_min_value; GstFormat burst_max_format; guint64 burst_max_value; - - GstCaps *caps; /* caps of last queued buffer */ - - /* stats */ - guint64 bytes_sent; - guint64 connect_time; - guint64 disconnect_time; - guint64 last_activity_time; - guint64 dropped_buffers; - guint64 avg_queue_size; - guint64 first_buffer_ts; - guint64 last_buffer_ts; } GstSocketClient; -#define CLIENTS_LOCK_INIT(socketsink) (g_rec_mutex_init(&socketsink->clientslock)) -#define CLIENTS_LOCK_CLEAR(socketsink) (g_rec_mutex_clear(&socketsink->clientslock)) -#define CLIENTS_LOCK(socketsink) (g_rec_mutex_lock(&socketsink->clientslock)) -#define CLIENTS_UNLOCK(socketsink) (g_rec_mutex_unlock(&socketsink->clientslock)) - /** * GstMultiSocketSink: * * The multisocketsink object structure. */ struct _GstMultiSocketSink { - GstBaseSink element; + GstMultiHandleSink element; /*< private >*/ - guint64 bytes_to_serve; /* how much bytes we must serve */ - guint64 bytes_served; /* how much bytes have we served */ - GRecMutex clientslock; /* lock to protect the clients list */ GList *clients; /* list of clients we are serving */ GHashTable *socket_hash; /* index on socket to client */ @@ -139,20 +100,10 @@ struct _GstMultiSocketSink { GstFormat unit_type;/* the format of the units */ gint64 units_max; /* max units to queue for a client */ gint64 units_soft_max; /* max units a client can lag before recovery starts */ - GstRecoverPolicy recover_policy; - GstClockTime timeout; /* max amount of nanoseconds to remain idle */ - GstSyncMethod def_sync_method; /* what method to use for connecting clients */ GstFormat def_burst_format; guint64 def_burst_value; - /* these values are used to control the amount of data - * kept in the queues. It allows clients to perform a burst - * on connect. */ - gint bytes_min; /* min number of bytes to queue */ - gint64 time_min; /* min time to queue */ - gint buffers_min; /* min number of buffers to queue */ - gboolean resend_streamheader; /* resend streamheader if it changes */ /* stats */ @@ -164,7 +115,7 @@ struct _GstMultiSocketSink { }; struct _GstMultiSocketSinkClass { - GstBaseSinkClass parent_class; + GstMultiHandleSinkClass parent_class; /* element methods */ void (*add) (GstMultiSocketSink *sink, GSocket *socket); @@ -195,7 +146,7 @@ void gst_multi_socket_sink_add_full (GstMultiSocketSink *sink, GSoc GstFormat max_format, guint64 max_value); void gst_multi_socket_sink_remove (GstMultiSocketSink *sink, GSocket *socket); void gst_multi_socket_sink_remove_flush (GstMultiSocketSink *sink, GSocket *socket); -void gst_multi_socket_sink_clear (GstMultiSocketSink *sink); +void gst_multi_socket_sink_clear (GstMultiHandleSink *sink); GstStructure* gst_multi_socket_sink_get_stats (GstMultiSocketSink *sink, GSocket *socket); G_END_DECLS