From 15927b6511bc8304ae144a45c9fbfca88e5dd641 Mon Sep 17 00:00:00 2001 From: George Kiagiadakis Date: Mon, 4 Sep 2017 15:52:03 +0300 Subject: [PATCH] ipcpipeline: use GstPoll instead of select() to watch for socket activity ... and make that code more readable in the process https://bugzilla.gnome.org/show_bug.cgi?id=787208 --- sys/ipcpipeline/gstipcpipelinecomm.c | 181 ++++++++++++--------------- sys/ipcpipeline/gstipcpipelinecomm.h | 6 +- 2 files changed, 84 insertions(+), 103 deletions(-) diff --git a/sys/ipcpipeline/gstipcpipelinecomm.c b/sys/ipcpipeline/gstipcpipelinecomm.c index e21da99f89..6f17a11fe1 100644 --- a/sys/ipcpipeline/gstipcpipelinecomm.c +++ b/sys/ipcpipeline/gstipcpipelinecomm.c @@ -1605,15 +1605,16 @@ gst_ipc_pipeline_comm_init (GstIpcPipelineComm * comm, GstElement * element) g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL, (GDestroyNotify) comm_request_free); comm->adapter = gst_adapter_new (); - g_atomic_int_set (&comm->thread_running, 0); + comm->poll = gst_poll_new (TRUE); + gst_poll_fd_init (&comm->pollFDin); } void gst_ipc_pipeline_comm_clear (GstIpcPipelineComm * comm) { - g_assert (!g_atomic_int_get (&comm->thread_running)); g_hash_table_destroy (comm->waiting_ids); gst_object_unref (comm->adapter); + gst_poll_free (comm->poll); g_mutex_clear (&comm->mutex); } @@ -1698,91 +1699,75 @@ gst_ipc_pipeline_comm_reply_request (GstIpcPipelineComm * comm, guint32 id, return TRUE; } -static gboolean +static gint update_adapter (GstIpcPipelineComm * comm) { GstMemory *mem = NULL; + GstBuffer *buf; + GstMapInfo map; + ssize_t sz; + gint ret = 0; - for (;;) { - fd_set set; - struct timeval tv; - int sret; - ssize_t sz; - GstBuffer *buf; - GstMapInfo map; - int fdin = comm->fdin; - int fdclose = comm->reader_thread_stopping_pipe[0]; - int fdmax; - - FD_ZERO (&set); - FD_SET (fdclose, &set); - fdmax = fdclose; - if (fdin >= 0 && GST_ELEMENT_PARENT (comm->element)) { - FD_SET (fdin, &set); - if (fdin > fdmax) - fdmax = fdin; +again: + /* update pollFDin if necessary (fdin changed or we lost our parent). + * we do not allow a parent-less element to communicate with its peer + * in order to avoid race conditions where the slave tries to change + * the state of its parent pipeline while it is not yet added in that + * pipeline. */ + if (comm->pollFDin.fd != comm->fdin || !GST_OBJECT_PARENT (comm->element)) { + if (comm->pollFDin.fd != -1) { + GST_DEBUG_OBJECT (comm->element, "Stop watching fd %d", + comm->pollFDin.fd); + gst_poll_remove_fd (comm->poll, &comm->pollFDin); + gst_poll_fd_init (&comm->pollFDin); } - tv.tv_sec = 0; - tv.tv_usec = 100000; - sret = select (fdmax + 1, &set, NULL, NULL, &tv); - if (sret < 0) { - if (errno == EAGAIN) - continue; - if (errno == EINTR) - break; - if (mem) - gst_memory_unref (mem); - return FALSE; + if (comm->fdin != -1 && GST_OBJECT_PARENT (comm->element)) { + GST_DEBUG_OBJECT (comm->element, "Start watching fd %d", comm->fdin); + comm->pollFDin.fd = comm->fdin; + gst_poll_add_fd (comm->poll, &comm->pollFDin); + gst_poll_fd_ctl_read (comm->poll, &comm->pollFDin, TRUE); } - if (FD_ISSET (fdclose, &set)) { - GST_INFO_OBJECT (comm->element, "data received on close notify pipe"); - comm->reader_thread_stopping = TRUE; - break; - } - if (fdin < 0) - break; - if (!FD_ISSET (fdin, &set)) - break; - if (mem == NULL) - mem = gst_allocator_alloc (NULL, comm->read_chunk_size, NULL); - gst_memory_map (mem, &map, GST_MAP_WRITE); - sz = read (fdin, map.data, map.size); - gst_memory_unmap (mem, &map); - if (sz < 0) { - if (errno == EAGAIN) - continue; - mem = NULL; - if (errno == EINTR) - break; - gst_memory_unref (mem); - return FALSE; - } - if (sz == 0) { - GST_INFO_OBJECT (comm->element, "fd closed"); - comm->reader_thread_stopping = TRUE; - break; - } - gst_memory_resize (mem, 0, sz); - buf = gst_buffer_new (); - gst_buffer_append_memory (buf, mem); - mem = NULL; - GST_TRACE_OBJECT (comm->element, "Read %u bytes from fd", (unsigned) sz); - gst_adapter_push (comm->adapter, buf); - - /* If we have more data, we loop, otherwise we break */ - FD_ZERO (&set); - if (fdin >= 0) - FD_SET (comm->fdin, &set); - tv.tv_sec = 0; - tv.tv_usec = 0; - sret = select (fdin + 1, &set, NULL, NULL, &tv); - if (sret < 0 || !FD_ISSET (fdin, &set)) - break; } + + /* wait for activity on fdin or a flush */ + if (gst_poll_wait (comm->poll, 100 * GST_MSECOND) < 0) { + if (errno == EAGAIN) + goto again; + /* error out, unless interrupted or flushing */ + if (errno != EINTR) + ret = (errno == EBUSY) ? 2 : 1; + } + + /* read from fdin if possible and push data to our adapter */ + if (comm->pollFDin.fd >= 0 + && gst_poll_fd_can_read (comm->poll, &comm->pollFDin)) { + if (!mem) + mem = gst_allocator_alloc (NULL, comm->read_chunk_size, NULL); + + gst_memory_map (mem, &map, GST_MAP_WRITE); + sz = read (comm->pollFDin.fd, map.data, map.size); + gst_memory_unmap (mem, &map); + + if (sz <= 0) { + if (errno == EAGAIN) + goto again; + /* error out, unless interrupted */ + if (errno != EINTR) + ret = 1; + } else { + gst_memory_resize (mem, 0, sz); + buf = gst_buffer_new (); + gst_buffer_append_memory (buf, mem); + mem = NULL; + GST_TRACE_OBJECT (comm->element, "Read %u bytes from fd", (unsigned) sz); + gst_adapter_push (comm->adapter, buf); + } + } + if (mem) gst_memory_unref (mem); - return TRUE; + return ret; } static gboolean @@ -2151,24 +2136,28 @@ static gpointer reader_thread (gpointer data) { GstIpcPipelineComm *comm = (GstIpcPipelineComm *) data; + gboolean running = TRUE; + gint ret = 0; - g_atomic_int_set (&comm->thread_running, 1); - while (!comm->reader_thread_stopping) { - if (!update_adapter (comm)) { - if (comm->reader_thread_stopping) { - GST_INFO_OBJECT (comm->element, "We're stopping, all good"); + while (running) { + ret = update_adapter (comm); + switch (ret) { + case 1: + GST_ELEMENT_ERROR (comm->element, RESOURCE, READ, (NULL), + ("Failed to read from socket")); + running = FALSE; + break; + case 2: + GST_INFO_OBJECT (comm->element, "We're stopping, all good"); + running = FALSE; + break; + default: + read_many (comm); break; - } - GST_ELEMENT_ERROR (comm->element, RESOURCE, READ, (NULL), - ("Failed to read from socket")); - break; } - read_many (comm); } GST_INFO_OBJECT (comm->element, "Reader thread ending"); - g_atomic_int_set (&comm->thread_running, 0); - return NULL; } @@ -2184,7 +2173,6 @@ gst_ipc_pipeline_comm_start_reader_thread (GstIpcPipelineComm * comm, if (comm->reader_thread) return FALSE; - comm->reader_thread_stopping = FALSE; comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE; comm->on_buffer = on_buffer; comm->on_event = on_event; @@ -2193,10 +2181,7 @@ gst_ipc_pipeline_comm_start_reader_thread (GstIpcPipelineComm * comm, comm->on_state_lost = on_state_lost; comm->on_message = on_message; comm->user_data = user_data; - if (pipe (comm->reader_thread_stopping_pipe) < 0) { - GST_WARNING_OBJECT (comm->element, "Failed to create pipes"); - return FALSE; - } + gst_poll_set_flushing (comm->poll, FALSE); comm->reader_thread = g_thread_new ("reader", (GThreadFunc) reader_thread, comm); return TRUE; @@ -2205,15 +2190,11 @@ gst_ipc_pipeline_comm_start_reader_thread (GstIpcPipelineComm * comm, void gst_ipc_pipeline_comm_stop_reader_thread (GstIpcPipelineComm * comm) { - char dummy = 0; - if (!comm->reader_thread) return; - while (write (comm->reader_thread_stopping_pipe[1], &dummy, 1) < 0 - && errno == EINTR); + + gst_poll_set_flushing (comm->poll, TRUE); g_thread_join (comm->reader_thread); - close (comm->reader_thread_stopping_pipe[0]); - close (comm->reader_thread_stopping_pipe[1]); comm->reader_thread = NULL; } diff --git a/sys/ipcpipeline/gstipcpipelinecomm.h b/sys/ipcpipeline/gstipcpipelinecomm.h index bd7335e9e2..cebbadb215 100644 --- a/sys/ipcpipeline/gstipcpipelinecomm.h +++ b/sys/ipcpipeline/gstipcpipelinecomm.h @@ -64,9 +64,9 @@ typedef struct GHashTable *waiting_ids; GThread *reader_thread; - gboolean reader_thread_stopping; - volatile gint thread_running; - int reader_thread_stopping_pipe[2]; + GstPoll *poll; + GstPollFD pollFDin; + GstAdapter *adapter; guint8 state; guint32 send_id;