multisocketsink: add GstNetworkMessage event

Add a property and logic to send a GstNetworkMessage event containing
the message that was received from a client. This can be used to
implement simply bidirectional communication.
This commit is contained in:
Wim Taymans 2015-12-10 12:18:04 +01:00
parent 9aaaa26ff3
commit 9c2bcd7b76
2 changed files with 83 additions and 12 deletions

View file

@ -137,11 +137,13 @@ enum
}; };
#define DEFAULT_SEND_DISPATCHED FALSE #define DEFAULT_SEND_DISPATCHED FALSE
#define DEFAULT_SEND_MESSAGES FALSE
enum enum
{ {
PROP_0, PROP_0,
PROP_SEND_DISPATCHED, PROP_SEND_DISPATCHED,
PROP_SEND_MESSAGES,
PROP_LAST PROP_LAST
}; };
@ -223,10 +225,41 @@ gst_multi_socket_sink_class_init (GstMultiSocketSinkClass * klass)
gobject_class->get_property = gst_multi_socket_sink_get_property; gobject_class->get_property = gst_multi_socket_sink_get_property;
gobject_class->finalize = gst_multi_socket_sink_finalize; gobject_class->finalize = gst_multi_socket_sink_finalize;
/**
* GstMultiSocketSink:send-dispatched:
*
* Sends a GstNetworkMessageDispatched event upstream whenever a buffer
* is sent to a client.
* The event is a CUSTOM event name GstNetworkMessageDispatched and
* contains:
*
* "object" G_TYPE_OBJECT : the object identifying the client
* "buffer" GST_TYPE_BUFFER : the buffer sent to the client
*
* Since: 1.8.0
*/
g_object_class_install_property (gobject_class, PROP_SEND_DISPATCHED, g_object_class_install_property (gobject_class, PROP_SEND_DISPATCHED,
g_param_spec_boolean ("send-dispatched", "Send Dispatched", g_param_spec_boolean ("send-dispatched", "Send Dispatched",
"If GstNetworkMessageDispatched events should be pushed", "If GstNetworkMessageDispatched events should be pushed",
DEFAULT_SEND_DISPATCHED, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); DEFAULT_SEND_DISPATCHED, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/**
* GstMultiSocketSink:send-messages:
*
* Sends a GstNetworkMessage event upstream whenever a buffer
* is received from a client.
* The event is a CUSTOM event name GstNetworkMessage and contains:
*
* "object" G_TYPE_OBJECT : the object identifying the client
* "buffer" GST_TYPE_BUFFER : the buffer with data received from the
* client
*
* Since: 1.8.0
*/
g_object_class_install_property (gobject_class, PROP_SEND_MESSAGES,
g_param_spec_boolean ("send-messages", "Send Messages",
"If GstNetworkMessage events should be pushed", DEFAULT_SEND_MESSAGES,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/** /**
* GstMultiSocketSink::add: * GstMultiSocketSink::add:
* @gstmultisocketsink: the multisocketsink element to emit this signal on * @gstmultisocketsink: the multisocketsink element to emit this signal on
@ -416,6 +449,7 @@ gst_multi_socket_sink_init (GstMultiSocketSink * this)
this->cancellable = g_cancellable_new (); this->cancellable = g_cancellable_new ();
this->send_dispatched = DEFAULT_SEND_DISPATCHED; this->send_dispatched = DEFAULT_SEND_DISPATCHED;
this->send_messages = DEFAULT_SEND_MESSAGES;
} }
static void static void
@ -569,38 +603,49 @@ static gboolean
gst_multi_socket_sink_handle_client_read (GstMultiSocketSink * sink, gst_multi_socket_sink_handle_client_read (GstMultiSocketSink * sink,
GstSocketClient * client) GstSocketClient * client)
{ {
gboolean ret; gboolean ret, do_event;
gchar dummy[256]; gchar dummy[256], *mem, *omem;
gssize nread; gssize nread;
GError *err = NULL; GError *err = NULL;
gboolean first = TRUE; gboolean first = TRUE;
GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client; GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
gssize navail, maxmem;
GST_DEBUG_OBJECT (sink, "%s select reports client read", mhclient->debug); GST_DEBUG_OBJECT (sink, "%s select reports client read", mhclient->debug);
ret = TRUE; ret = TRUE;
navail = g_socket_get_available_bytes (mhclient->handle.socket);
if (navail <= 0)
return TRUE;
/* only collect the data in a buffer when we need to send it with an event */
do_event = sink->send_messages;
if (do_event) {
omem = mem = g_malloc (navail);
maxmem = navail;
} else {
mem = dummy;
maxmem = sizeof (dummy);
}
/* just Read 'n' Drop, could also just drop the client as it's not supposed /* just Read 'n' Drop, could also just drop the client as it's not supposed
* to write to us except for closing the socket, I guess it's because we * to write to us except for closing the socket, I guess it's because we
* like to listen to our customers. */ * like to listen to our customers. */
do { while (navail > 0) {
gssize navail;
GST_DEBUG_OBJECT (sink, "%s client wants us to read", mhclient->debug); GST_DEBUG_OBJECT (sink, "%s client wants us to read", mhclient->debug);
navail = g_socket_get_available_bytes (mhclient->handle.socket);
if (navail <= 0)
break;
nread = nread =
g_socket_receive (mhclient->handle.socket, dummy, MIN (navail, g_socket_receive (mhclient->handle.socket, mem, MIN (navail,
sizeof (dummy)), sink->cancellable, &err); maxmem), sink->cancellable, &err);
if (first && nread == 0) { if (first && nread == 0) {
/* client sent close, so remove it */ /* client sent close, so remove it */
GST_DEBUG_OBJECT (sink, "%s client asked for close, removing", GST_DEBUG_OBJECT (sink, "%s client asked for close, removing",
mhclient->debug); mhclient->debug);
mhclient->status = GST_CLIENT_STATUS_CLOSED; mhclient->status = GST_CLIENT_STATUS_CLOSED;
ret = FALSE; ret = FALSE;
break;
} else if (nread < 0) { } else if (nread < 0) {
GST_WARNING_OBJECT (sink, "%s could not read: %s", GST_WARNING_OBJECT (sink, "%s could not read: %s",
mhclient->debug, err->message); mhclient->debug, err->message);
@ -608,10 +653,29 @@ gst_multi_socket_sink_handle_client_read (GstMultiSocketSink * sink,
ret = FALSE; ret = FALSE;
break; break;
} }
navail -= nread;
if (do_event)
mem += nread;
first = FALSE; first = FALSE;
} while (nread > 0); }
g_clear_error (&err); g_clear_error (&err);
if (do_event) {
if (ret) {
GstBuffer *buf;
GstEvent *ev;
buf = gst_buffer_new_wrapped (omem, maxmem);
ev = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM,
gst_structure_new ("GstNetworkMessage",
"object", G_TYPE_OBJECT, mhclient->handle.socket,
"buffer", GST_TYPE_BUFFER, buf, NULL));
gst_buffer_unref (buf);
gst_pad_push_event (GST_BASE_SINK_PAD (sink), ev);
} else
g_free (omem);
}
return ret; return ret;
} }
@ -1114,6 +1178,9 @@ gst_multi_socket_sink_set_property (GObject * object, guint prop_id,
case PROP_SEND_DISPATCHED: case PROP_SEND_DISPATCHED:
sink->send_dispatched = g_value_get_boolean (value); sink->send_dispatched = g_value_get_boolean (value);
break; break;
case PROP_SEND_MESSAGES:
sink->send_messages = g_value_get_boolean (value);
break;
default: default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break; break;
@ -1130,6 +1197,9 @@ gst_multi_socket_sink_get_property (GObject * object, guint prop_id,
case PROP_SEND_DISPATCHED: case PROP_SEND_DISPATCHED:
g_value_set_boolean (value, sink->send_dispatched); g_value_set_boolean (value, sink->send_dispatched);
break; break;
case PROP_SEND_MESSAGES:
g_value_set_boolean (value, sink->send_messages);
break;
default: default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break; break;

View file

@ -68,6 +68,7 @@ struct _GstMultiSocketSink {
/*< private >*/ /*< private >*/
GMainContext *main_context; GMainContext *main_context;
GCancellable *cancellable; GCancellable *cancellable;
gboolean send_messages;
gboolean send_dispatched; gboolean send_dispatched;
}; };