ipcpipeline: use GstPoll instead of select() to watch for socket activity

... and make that code more readable in the process

https://bugzilla.gnome.org/show_bug.cgi?id=787208
This commit is contained in:
George Kiagiadakis 2017-09-04 15:52:03 +03:00
parent 91edec25dd
commit 15927b6511
2 changed files with 84 additions and 103 deletions

View file

@ -1605,15 +1605,16 @@ gst_ipc_pipeline_comm_init (GstIpcPipelineComm * comm, GstElement * element)
g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL,
(GDestroyNotify) comm_request_free);
comm->adapter = gst_adapter_new ();
g_atomic_int_set (&comm->thread_running, 0);
comm->poll = gst_poll_new (TRUE);
gst_poll_fd_init (&comm->pollFDin);
}
void
gst_ipc_pipeline_comm_clear (GstIpcPipelineComm * comm)
{
g_assert (!g_atomic_int_get (&comm->thread_running));
g_hash_table_destroy (comm->waiting_ids);
gst_object_unref (comm->adapter);
gst_poll_free (comm->poll);
g_mutex_clear (&comm->mutex);
}
@ -1698,91 +1699,75 @@ gst_ipc_pipeline_comm_reply_request (GstIpcPipelineComm * comm, guint32 id,
return TRUE;
}
static gboolean
static gint
update_adapter (GstIpcPipelineComm * comm)
{
GstMemory *mem = NULL;
GstBuffer *buf;
GstMapInfo map;
ssize_t sz;
gint ret = 0;
for (;;) {
fd_set set;
struct timeval tv;
int sret;
ssize_t sz;
GstBuffer *buf;
GstMapInfo map;
int fdin = comm->fdin;
int fdclose = comm->reader_thread_stopping_pipe[0];
int fdmax;
FD_ZERO (&set);
FD_SET (fdclose, &set);
fdmax = fdclose;
if (fdin >= 0 && GST_ELEMENT_PARENT (comm->element)) {
FD_SET (fdin, &set);
if (fdin > fdmax)
fdmax = fdin;
again:
/* update pollFDin if necessary (fdin changed or we lost our parent).
* we do not allow a parent-less element to communicate with its peer
* in order to avoid race conditions where the slave tries to change
* the state of its parent pipeline while it is not yet added in that
* pipeline. */
if (comm->pollFDin.fd != comm->fdin || !GST_OBJECT_PARENT (comm->element)) {
if (comm->pollFDin.fd != -1) {
GST_DEBUG_OBJECT (comm->element, "Stop watching fd %d",
comm->pollFDin.fd);
gst_poll_remove_fd (comm->poll, &comm->pollFDin);
gst_poll_fd_init (&comm->pollFDin);
}
tv.tv_sec = 0;
tv.tv_usec = 100000;
sret = select (fdmax + 1, &set, NULL, NULL, &tv);
if (sret < 0) {
if (errno == EAGAIN)
continue;
if (errno == EINTR)
break;
if (mem)
gst_memory_unref (mem);
return FALSE;
if (comm->fdin != -1 && GST_OBJECT_PARENT (comm->element)) {
GST_DEBUG_OBJECT (comm->element, "Start watching fd %d", comm->fdin);
comm->pollFDin.fd = comm->fdin;
gst_poll_add_fd (comm->poll, &comm->pollFDin);
gst_poll_fd_ctl_read (comm->poll, &comm->pollFDin, TRUE);
}
if (FD_ISSET (fdclose, &set)) {
GST_INFO_OBJECT (comm->element, "data received on close notify pipe");
comm->reader_thread_stopping = TRUE;
break;
}
if (fdin < 0)
break;
if (!FD_ISSET (fdin, &set))
break;
if (mem == NULL)
mem = gst_allocator_alloc (NULL, comm->read_chunk_size, NULL);
gst_memory_map (mem, &map, GST_MAP_WRITE);
sz = read (fdin, map.data, map.size);
gst_memory_unmap (mem, &map);
if (sz < 0) {
if (errno == EAGAIN)
continue;
mem = NULL;
if (errno == EINTR)
break;
gst_memory_unref (mem);
return FALSE;
}
if (sz == 0) {
GST_INFO_OBJECT (comm->element, "fd closed");
comm->reader_thread_stopping = TRUE;
break;
}
gst_memory_resize (mem, 0, sz);
buf = gst_buffer_new ();
gst_buffer_append_memory (buf, mem);
mem = NULL;
GST_TRACE_OBJECT (comm->element, "Read %u bytes from fd", (unsigned) sz);
gst_adapter_push (comm->adapter, buf);
/* If we have more data, we loop, otherwise we break */
FD_ZERO (&set);
if (fdin >= 0)
FD_SET (comm->fdin, &set);
tv.tv_sec = 0;
tv.tv_usec = 0;
sret = select (fdin + 1, &set, NULL, NULL, &tv);
if (sret < 0 || !FD_ISSET (fdin, &set))
break;
}
/* wait for activity on fdin or a flush */
if (gst_poll_wait (comm->poll, 100 * GST_MSECOND) < 0) {
if (errno == EAGAIN)
goto again;
/* error out, unless interrupted or flushing */
if (errno != EINTR)
ret = (errno == EBUSY) ? 2 : 1;
}
/* read from fdin if possible and push data to our adapter */
if (comm->pollFDin.fd >= 0
&& gst_poll_fd_can_read (comm->poll, &comm->pollFDin)) {
if (!mem)
mem = gst_allocator_alloc (NULL, comm->read_chunk_size, NULL);
gst_memory_map (mem, &map, GST_MAP_WRITE);
sz = read (comm->pollFDin.fd, map.data, map.size);
gst_memory_unmap (mem, &map);
if (sz <= 0) {
if (errno == EAGAIN)
goto again;
/* error out, unless interrupted */
if (errno != EINTR)
ret = 1;
} else {
gst_memory_resize (mem, 0, sz);
buf = gst_buffer_new ();
gst_buffer_append_memory (buf, mem);
mem = NULL;
GST_TRACE_OBJECT (comm->element, "Read %u bytes from fd", (unsigned) sz);
gst_adapter_push (comm->adapter, buf);
}
}
if (mem)
gst_memory_unref (mem);
return TRUE;
return ret;
}
static gboolean
@ -2151,24 +2136,28 @@ static gpointer
reader_thread (gpointer data)
{
GstIpcPipelineComm *comm = (GstIpcPipelineComm *) data;
gboolean running = TRUE;
gint ret = 0;
g_atomic_int_set (&comm->thread_running, 1);
while (!comm->reader_thread_stopping) {
if (!update_adapter (comm)) {
if (comm->reader_thread_stopping) {
GST_INFO_OBJECT (comm->element, "We're stopping, all good");
while (running) {
ret = update_adapter (comm);
switch (ret) {
case 1:
GST_ELEMENT_ERROR (comm->element, RESOURCE, READ, (NULL),
("Failed to read from socket"));
running = FALSE;
break;
case 2:
GST_INFO_OBJECT (comm->element, "We're stopping, all good");
running = FALSE;
break;
default:
read_many (comm);
break;
}
GST_ELEMENT_ERROR (comm->element, RESOURCE, READ, (NULL),
("Failed to read from socket"));
break;
}
read_many (comm);
}
GST_INFO_OBJECT (comm->element, "Reader thread ending");
g_atomic_int_set (&comm->thread_running, 0);
return NULL;
}
@ -2184,7 +2173,6 @@ gst_ipc_pipeline_comm_start_reader_thread (GstIpcPipelineComm * comm,
if (comm->reader_thread)
return FALSE;
comm->reader_thread_stopping = FALSE;
comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
comm->on_buffer = on_buffer;
comm->on_event = on_event;
@ -2193,10 +2181,7 @@ gst_ipc_pipeline_comm_start_reader_thread (GstIpcPipelineComm * comm,
comm->on_state_lost = on_state_lost;
comm->on_message = on_message;
comm->user_data = user_data;
if (pipe (comm->reader_thread_stopping_pipe) < 0) {
GST_WARNING_OBJECT (comm->element, "Failed to create pipes");
return FALSE;
}
gst_poll_set_flushing (comm->poll, FALSE);
comm->reader_thread =
g_thread_new ("reader", (GThreadFunc) reader_thread, comm);
return TRUE;
@ -2205,15 +2190,11 @@ gst_ipc_pipeline_comm_start_reader_thread (GstIpcPipelineComm * comm,
void
gst_ipc_pipeline_comm_stop_reader_thread (GstIpcPipelineComm * comm)
{
char dummy = 0;
if (!comm->reader_thread)
return;
while (write (comm->reader_thread_stopping_pipe[1], &dummy, 1) < 0
&& errno == EINTR);
gst_poll_set_flushing (comm->poll, TRUE);
g_thread_join (comm->reader_thread);
close (comm->reader_thread_stopping_pipe[0]);
close (comm->reader_thread_stopping_pipe[1]);
comm->reader_thread = NULL;
}

View file

@ -64,9 +64,9 @@ typedef struct
GHashTable *waiting_ids;
GThread *reader_thread;
gboolean reader_thread_stopping;
volatile gint thread_running;
int reader_thread_stopping_pipe[2];
GstPoll *poll;
GstPollFD pollFDin;
GstAdapter *adapter;
guint8 state;
guint32 send_id;