mirror of
https://gitlab.freedesktop.org/gstreamer/gstreamer.git
synced 2025-03-30 12:49:40 +00:00
multisocketsink: keep on reading when we stop sending
When we stop sending because we need more data, still keep a GSource around to receive data from the clients. Also handle read and write in the same go.
This commit is contained in:
parent
2f3eb47a95
commit
01f5ca3da8
2 changed files with 47 additions and 32 deletions
|
@ -181,6 +181,8 @@ static void gst_multi_socket_sink_hash_adding (GstMultiHandleSink * mhsink,
|
|||
GstMultiHandleClient * mhclient);
|
||||
static void gst_multi_socket_sink_hash_removing (GstMultiHandleSink * mhsink,
|
||||
GstMultiHandleClient * mhclient);
|
||||
static void gst_multi_socket_sink_stop_sending (GstMultiSocketSink * sink,
|
||||
GstSocketClient * client);
|
||||
|
||||
static gboolean gst_multi_socket_sink_socket_condition (GstMultiSinkHandle
|
||||
handle, GIOCondition condition, GstMultiSocketSink * sink);
|
||||
|
@ -764,12 +766,7 @@ gst_multi_socket_sink_handle_client_write (GstMultiSocketSink * sink,
|
|||
if (mhclient->bufpos == -1) {
|
||||
/* client is too fast, remove from write queue until new buffer is
|
||||
* available */
|
||||
/* FIXME: specific */
|
||||
if (client->source) {
|
||||
g_source_destroy (client->source);
|
||||
g_source_unref (client->source);
|
||||
client->source = NULL;
|
||||
}
|
||||
gst_multi_socket_sink_stop_sending (sink, client);
|
||||
|
||||
/* if we flushed out all of the client buffers, we can stop */
|
||||
if (mhclient->flushcount == 0)
|
||||
|
@ -793,13 +790,7 @@ gst_multi_socket_sink_handle_client_write (GstMultiSocketSink * sink,
|
|||
mhclient->bufpos = position;
|
||||
} else {
|
||||
/* cannot send data to this client yet */
|
||||
/* FIXME: specific */
|
||||
if (client->source) {
|
||||
g_source_destroy (client->source);
|
||||
g_source_unref (client->source);
|
||||
client->source = NULL;
|
||||
}
|
||||
|
||||
gst_multi_socket_sink_stop_sending (sink, client);
|
||||
return TRUE;
|
||||
}
|
||||
}
|
||||
|
@ -908,6 +899,33 @@ write_error:
|
|||
}
|
||||
}
|
||||
|
||||
static void
|
||||
ensure_condition (GstMultiSocketSink * sink, GstSocketClient * client,
|
||||
GIOCondition condition)
|
||||
{
|
||||
GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
|
||||
|
||||
if (client->condition == condition)
|
||||
return;
|
||||
|
||||
if (client->source) {
|
||||
g_source_destroy (client->source);
|
||||
g_source_unref (client->source);
|
||||
}
|
||||
if (condition && sink->main_context) {
|
||||
client->source = g_socket_create_source (mhclient->handle.socket,
|
||||
condition, sink->cancellable);
|
||||
g_source_set_callback (client->source,
|
||||
(GSourceFunc) gst_multi_socket_sink_socket_condition,
|
||||
gst_object_ref (sink), (GDestroyNotify) gst_object_unref);
|
||||
g_source_attach (client->source, sink->main_context);
|
||||
} else {
|
||||
client->source = NULL;
|
||||
condition = 0;
|
||||
}
|
||||
client->condition = condition;
|
||||
}
|
||||
|
||||
static void
|
||||
gst_multi_socket_sink_hash_adding (GstMultiHandleSink * mhsink,
|
||||
GstMultiHandleClient * mhclient)
|
||||
|
@ -915,31 +933,25 @@ gst_multi_socket_sink_hash_adding (GstMultiHandleSink * mhsink,
|
|||
GstMultiSocketSink *sink = GST_MULTI_SOCKET_SINK (mhsink);
|
||||
GstSocketClient *client = (GstSocketClient *) (mhclient);
|
||||
|
||||
if (!sink->main_context)
|
||||
return;
|
||||
|
||||
if (!client->source) {
|
||||
client->source =
|
||||
g_socket_create_source (mhclient->handle.socket,
|
||||
G_IO_IN | G_IO_OUT | G_IO_PRI | G_IO_ERR | G_IO_HUP, sink->cancellable);
|
||||
g_source_set_callback (client->source,
|
||||
(GSourceFunc) gst_multi_socket_sink_socket_condition,
|
||||
gst_object_ref (sink), (GDestroyNotify) gst_object_unref);
|
||||
g_source_attach (client->source, sink->main_context);
|
||||
}
|
||||
ensure_condition (sink, client,
|
||||
G_IO_IN | G_IO_OUT | G_IO_PRI | G_IO_ERR | G_IO_HUP);
|
||||
}
|
||||
|
||||
static void
|
||||
gst_multi_socket_sink_hash_removing (GstMultiHandleSink * mhsink,
|
||||
GstMultiHandleClient * mhclient)
|
||||
{
|
||||
GstMultiSocketSink *sink = GST_MULTI_SOCKET_SINK (mhsink);
|
||||
GstSocketClient *client = (GstSocketClient *) (mhclient);
|
||||
|
||||
if (client->source) {
|
||||
g_source_destroy (client->source);
|
||||
g_source_unref (client->source);
|
||||
client->source = NULL;
|
||||
}
|
||||
ensure_condition (sink, client, 0);
|
||||
}
|
||||
|
||||
static void
|
||||
gst_multi_socket_sink_stop_sending (GstMultiSocketSink * sink,
|
||||
GstSocketClient * client)
|
||||
{
|
||||
ensure_condition (sink, client, G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP);
|
||||
}
|
||||
|
||||
/* Handle the clients. This is called when a socket becomes ready
|
||||
|
@ -987,14 +999,16 @@ gst_multi_socket_sink_socket_condition (GstMultiSinkHandle handle,
|
|||
gst_multi_handle_sink_remove_client_link (mhsink, clink);
|
||||
ret = FALSE;
|
||||
goto done;
|
||||
} else if ((condition & G_IO_IN) || (condition & G_IO_PRI)) {
|
||||
}
|
||||
if ((condition & G_IO_IN) || (condition & G_IO_PRI)) {
|
||||
/* handle client read */
|
||||
if (!gst_multi_socket_sink_handle_client_read (sink, client)) {
|
||||
gst_multi_handle_sink_remove_client_link (mhsink, clink);
|
||||
ret = FALSE;
|
||||
goto done;
|
||||
}
|
||||
} else if ((condition & G_IO_OUT)) {
|
||||
}
|
||||
if ((condition & G_IO_OUT)) {
|
||||
/* handle client write */
|
||||
if (!gst_multi_socket_sink_handle_client_write (sink, client)) {
|
||||
gst_multi_handle_sink_remove_client_link (mhsink, clink);
|
||||
|
|
|
@ -54,6 +54,7 @@ typedef struct {
|
|||
GstMultiHandleClient client;
|
||||
|
||||
GSource *source;
|
||||
GIOCondition condition;
|
||||
} GstSocketClient;
|
||||
|
||||
/**
|
||||
|
|
Loading…
Reference in a new issue