diff --git a/gst/tcp/gstmultisocketsink.c b/gst/tcp/gstmultisocketsink.c index b9f22e0003..f9acc13f10 100644 --- a/gst/tcp/gstmultisocketsink.c +++ b/gst/tcp/gstmultisocketsink.c @@ -137,11 +137,13 @@ enum }; #define DEFAULT_SEND_DISPATCHED FALSE +#define DEFAULT_SEND_MESSAGES FALSE enum { PROP_0, PROP_SEND_DISPATCHED, + PROP_SEND_MESSAGES, PROP_LAST }; @@ -223,10 +225,41 @@ gst_multi_socket_sink_class_init (GstMultiSocketSinkClass * klass) gobject_class->get_property = gst_multi_socket_sink_get_property; gobject_class->finalize = gst_multi_socket_sink_finalize; + /** + * GstMultiSocketSink:send-dispatched: + * + * Sends a GstNetworkMessageDispatched event upstream whenever a buffer + * is sent to a client. + * The event is a CUSTOM event name GstNetworkMessageDispatched and + * contains: + * + * "object" G_TYPE_OBJECT : the object identifying the client + * "buffer" GST_TYPE_BUFFER : the buffer sent to the client + * + * Since: 1.8.0 + */ g_object_class_install_property (gobject_class, PROP_SEND_DISPATCHED, g_param_spec_boolean ("send-dispatched", "Send Dispatched", "If GstNetworkMessageDispatched events should be pushed", DEFAULT_SEND_DISPATCHED, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + /** + * GstMultiSocketSink:send-messages: + * + * Sends a GstNetworkMessage event upstream whenever a buffer + * is received from a client. + * The event is a CUSTOM event name GstNetworkMessage and contains: + * + * "object" G_TYPE_OBJECT : the object identifying the client + * "buffer" GST_TYPE_BUFFER : the buffer with data received from the + * client + * + * Since: 1.8.0 + */ + g_object_class_install_property (gobject_class, PROP_SEND_MESSAGES, + g_param_spec_boolean ("send-messages", "Send Messages", + "If GstNetworkMessage events should be pushed", DEFAULT_SEND_MESSAGES, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + /** * GstMultiSocketSink::add: * @gstmultisocketsink: the multisocketsink element to emit this signal on @@ -416,6 +449,7 @@ gst_multi_socket_sink_init (GstMultiSocketSink * this) this->cancellable = g_cancellable_new (); this->send_dispatched = DEFAULT_SEND_DISPATCHED; + this->send_messages = DEFAULT_SEND_MESSAGES; } static void @@ -569,38 +603,49 @@ static gboolean gst_multi_socket_sink_handle_client_read (GstMultiSocketSink * sink, GstSocketClient * client) { - gboolean ret; - gchar dummy[256]; + gboolean ret, do_event; + gchar dummy[256], *mem, *omem; gssize nread; GError *err = NULL; gboolean first = TRUE; GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client; + gssize navail, maxmem; GST_DEBUG_OBJECT (sink, "%s select reports client read", mhclient->debug); ret = TRUE; + navail = g_socket_get_available_bytes (mhclient->handle.socket); + if (navail <= 0) + return TRUE; + + /* only collect the data in a buffer when we need to send it with an event */ + do_event = sink->send_messages; + if (do_event) { + omem = mem = g_malloc (navail); + maxmem = navail; + } else { + mem = dummy; + maxmem = sizeof (dummy); + } + /* just Read 'n' Drop, could also just drop the client as it's not supposed * to write to us except for closing the socket, I guess it's because we * like to listen to our customers. */ - do { - gssize navail; - + while (navail > 0) { GST_DEBUG_OBJECT (sink, "%s client wants us to read", mhclient->debug); - navail = g_socket_get_available_bytes (mhclient->handle.socket); - if (navail <= 0) - break; - nread = - g_socket_receive (mhclient->handle.socket, dummy, MIN (navail, - sizeof (dummy)), sink->cancellable, &err); + g_socket_receive (mhclient->handle.socket, mem, MIN (navail, + maxmem), sink->cancellable, &err); + if (first && nread == 0) { /* client sent close, so remove it */ GST_DEBUG_OBJECT (sink, "%s client asked for close, removing", mhclient->debug); mhclient->status = GST_CLIENT_STATUS_CLOSED; ret = FALSE; + break; } else if (nread < 0) { GST_WARNING_OBJECT (sink, "%s could not read: %s", mhclient->debug, err->message); @@ -608,10 +653,29 @@ gst_multi_socket_sink_handle_client_read (GstMultiSocketSink * sink, ret = FALSE; break; } + navail -= nread; + if (do_event) + mem += nread; first = FALSE; - } while (nread > 0); + } g_clear_error (&err); + if (do_event) { + if (ret) { + GstBuffer *buf; + GstEvent *ev; + + buf = gst_buffer_new_wrapped (omem, maxmem); + ev = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM, + gst_structure_new ("GstNetworkMessage", + "object", G_TYPE_OBJECT, mhclient->handle.socket, + "buffer", GST_TYPE_BUFFER, buf, NULL)); + gst_buffer_unref (buf); + + gst_pad_push_event (GST_BASE_SINK_PAD (sink), ev); + } else + g_free (omem); + } return ret; } @@ -1114,6 +1178,9 @@ gst_multi_socket_sink_set_property (GObject * object, guint prop_id, case PROP_SEND_DISPATCHED: sink->send_dispatched = g_value_get_boolean (value); break; + case PROP_SEND_MESSAGES: + sink->send_messages = g_value_get_boolean (value); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -1130,6 +1197,9 @@ gst_multi_socket_sink_get_property (GObject * object, guint prop_id, case PROP_SEND_DISPATCHED: g_value_set_boolean (value, sink->send_dispatched); break; + case PROP_SEND_MESSAGES: + g_value_set_boolean (value, sink->send_messages); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; diff --git a/gst/tcp/gstmultisocketsink.h b/gst/tcp/gstmultisocketsink.h index c68c9abc22..c89844d939 100644 --- a/gst/tcp/gstmultisocketsink.h +++ b/gst/tcp/gstmultisocketsink.h @@ -68,6 +68,7 @@ struct _GstMultiSocketSink { /*< private >*/ GMainContext *main_context; GCancellable *cancellable; + gboolean send_messages; gboolean send_dispatched; };