srtobject: Remove pointless GMainLoop

Just use srt's blocking epoll function and fix locking while we're at it.
This commit is contained in:
Olivier Crête 2019-08-23 16:21:47 -04:00 committed by Nicolas Dufresne
parent 733f5b2851
commit 54dc0b5579
2 changed files with 120 additions and 124 deletions

View file

@ -584,104 +584,103 @@ gst_srt_object_set_uri (GstSRTObject * srtobject, const gchar * uri,
static gpointer
thread_func (gpointer data)
{
GstSRTObject *srtobject = data;
g_main_loop_run (srtobject->loop);
return NULL;
}
static gboolean
idle_listen_source_cb (gpointer data)
{
GstSRTObject *srtobject = data;
SRTSOCKET caller_sock;
struct sockaddr caller_sa;
gsize caller_sa_len;
union
{
struct sockaddr_storage ss;
struct sockaddr sa;
} caller_sa;
int caller_sa_len;
gint poll_timeout;
SRTSOCKET rsock;
gint rsocklen = 1;
if (!gst_structure_get_int (srtobject->parameters, "poll-timeout",
&poll_timeout)) {
poll_timeout = GST_SRT_DEFAULT_POLL_TIMEOUT;
}
for (;;) {
if (!gst_structure_get_int (srtobject->parameters, "poll-timeout",
&poll_timeout)) {
poll_timeout = GST_SRT_DEFAULT_POLL_TIMEOUT;
}
GST_DEBUG_OBJECT (srtobject->element, "Waiting a request from caller");
GST_DEBUG_OBJECT (srtobject->element, "Waiting a request from caller");
if (srt_epoll_wait (srtobject->listener_poll_id, &rsock,
&rsocklen, 0, 0, poll_timeout, NULL, 0, NULL, 0) < 0) {
gint srt_errno = srt_getlasterror (NULL);
if (srt_epoll_wait (srtobject->listener_poll_id, &rsock,
&rsocklen, 0, 0, poll_timeout, NULL, 0, NULL, 0) < 0) {
gint srt_errno = srt_getlasterror (NULL);
if (srt_errno == SRT_ETIMEOUT) {
return TRUE;
} else {
GST_ELEMENT_ERROR (srtobject->element, RESOURCE, FAILED,
("abort polling: %s", srt_getlasterror_str ()), (NULL));
return FALSE;
if (srtobject->listener_poll_id == SRT_ERROR)
return NULL;
if (srt_errno == SRT_ETIMEOUT) {
continue;
} else {
GST_ELEMENT_ERROR (srtobject->element, RESOURCE, FAILED,
("abort polling: %s", srt_getlasterror_str ()), (NULL));
return NULL;
}
}
caller_sock =
srt_accept (srtobject->listener_sock, &caller_sa.sa, &caller_sa_len);
if (caller_sock != SRT_INVALID_SOCK) {
SRTCaller *caller;
gint flag = SRT_EPOLL_ERR;
caller = srt_caller_new ();
caller->sockaddr =
g_socket_address_new_from_native (&caller_sa.sa, caller_sa_len);
caller->poll_id = srt_epoll_create ();
caller->sock = caller_sock;
if (gst_uri_handler_get_uri_type (GST_URI_HANDLER
(srtobject->element)) == GST_URI_SRC) {
flag |= SRT_EPOLL_IN;
} else {
flag |= SRT_EPOLL_OUT;
}
if (srt_epoll_add_usock (caller->poll_id, caller_sock, &flag)) {
GST_ELEMENT_ERROR (srtobject->element, RESOURCE, SETTINGS,
("%s", srt_getlasterror_str ()), (NULL));
srt_caller_free (caller);
/* try-again */
continue;
}
GST_OBJECT_LOCK (srtobject->element);
srtobject->callers = g_list_append (srtobject->callers, caller);
g_cond_signal (&srtobject->sock_cond);
GST_OBJECT_UNLOCK (srtobject->element);
/* notifying caller-added */
if (srtobject->caller_added_closure != NULL) {
GValue values[2] = { G_VALUE_INIT, G_VALUE_INIT };
g_value_init (&values[0], G_TYPE_INT);
g_value_set_int (&values[0], caller->sock);
g_value_init (&values[1], G_TYPE_SOCKET_ADDRESS);
g_value_set_object (&values[1], caller->sockaddr);
g_closure_invoke (srtobject->caller_added_closure, NULL, 2, values,
NULL);
g_value_unset (&values[1]);
}
GST_DEBUG_OBJECT (srtobject->element, "Accept to connect");
if (gst_uri_handler_get_uri_type (GST_URI_HANDLER (srtobject->element)) ==
GST_URI_SRC)
return NULL;
}
}
caller_sock =
srt_accept (srtobject->listener_sock, &caller_sa, (int *) &caller_sa_len);
if (caller_sock != SRT_INVALID_SOCK) {
SRTCaller *caller;
gint flag = SRT_EPOLL_ERR;
caller = srt_caller_new ();
caller->sockaddr =
g_socket_address_new_from_native (&caller_sa, caller_sa_len);
caller->poll_id = srt_epoll_create ();
caller->sock = caller_sock;
if (gst_uri_handler_get_uri_type (GST_URI_HANDLER
(srtobject->element)) == GST_URI_SRC) {
flag |= SRT_EPOLL_IN;
} else {
flag |= SRT_EPOLL_OUT;
}
if (srt_epoll_add_usock (caller->poll_id, caller_sock, &flag)) {
GST_ELEMENT_ERROR (srtobject->element, RESOURCE, SETTINGS,
("%s", srt_getlasterror_str ()), (NULL));
srt_caller_free (caller);
/* try-again */
return TRUE;
}
GST_OBJECT_LOCK (srtobject->element);
srtobject->callers = g_list_append (srtobject->callers, caller);
g_cond_signal (&srtobject->sock_cond);
GST_OBJECT_UNLOCK (srtobject->element);
/* notifying caller-added */
if (srtobject->caller_added_closure != NULL) {
GValue values[2] = { G_VALUE_INIT, G_VALUE_INIT };
g_value_init (&values[0], G_TYPE_INT);
g_value_set_int (&values[0], caller->sock);
g_value_init (&values[1], G_TYPE_SOCKET_ADDRESS);
g_value_set_object (&values[1], caller->sockaddr);
g_closure_invoke (srtobject->caller_added_closure, NULL, 2, values, NULL);
g_value_unset (&values[1]);
}
GST_DEBUG_OBJECT (srtobject->element, "Accept to connect");
}
/* only one caller is allowed if the element is source. */
return gst_uri_handler_get_uri_type (GST_URI_HANDLER (srtobject->element)) !=
GST_URI_SRC;
}
static gboolean
@ -752,15 +751,6 @@ gst_srt_object_wait_connect (GstSRTObject * srtobject,
srtobject->listener_sock = sock;
srtobject->context = g_main_context_new ();
srtobject->loop = g_main_loop_new (srtobject->context, TRUE);
srtobject->listener_source = g_idle_source_new ();
g_source_set_callback (srtobject->listener_source,
(GSourceFunc) idle_listen_source_cb, srtobject, NULL);
g_source_attach (srtobject->listener_source, srtobject->context);
srtobject->thread =
g_thread_try_new ("GstSRTObjectListener", thread_func, srtobject, error);
@ -772,9 +762,6 @@ gst_srt_object_wait_connect (GstSRTObject * srtobject,
failed:
g_clear_pointer (&srtobject->loop, g_main_loop_unref);
g_clear_pointer (&srtobject->context, g_main_context_unref);
if (srtobject->listener_poll_id != SRT_ERROR) {
srt_epoll_release (srtobject->listener_poll_id);
}
@ -1019,6 +1006,7 @@ out:
void
gst_srt_object_close (GstSRTObject * srtobject)
{
GST_OBJECT_LOCK (srtobject->element);
if (srtobject->poll_id != SRT_ERROR) {
srt_epoll_remove_usock (srtobject->poll_id, srtobject->sock);
}
@ -1032,20 +1020,16 @@ gst_srt_object_close (GstSRTObject * srtobject)
srtobject->sock = SRT_INVALID_SOCK;
}
if (srtobject->loop) {
g_main_loop_quit (srtobject->loop);
if (srtobject->listener_poll_id != SRT_ERROR) {
srt_epoll_remove_usock (srtobject->listener_poll_id,
srtobject->listener_sock);
srtobject->listener_poll_id = SRT_ERROR;
}
g_thread_join (srtobject->thread);
g_clear_pointer (&srtobject->thread, g_thread_unref);
g_clear_pointer (&srtobject->loop, g_main_loop_unref);
g_clear_pointer (&srtobject->context, g_main_context_unref);
if (srtobject->listener_poll_id != SRT_ERROR) {
srt_epoll_remove_usock (srtobject->listener_poll_id,
srtobject->listener_sock);
srtobject->listener_poll_id = SRT_ERROR;
}
if (srtobject->thread) {
GThread *thread = g_steal_pointer (&srtobject->thread);
GST_OBJECT_UNLOCK (srtobject->element);
g_thread_join (thread);
GST_OBJECT_LOCK (srtobject->element);
}
if (srtobject->listener_sock != SRT_INVALID_SOCK) {
@ -1056,14 +1040,20 @@ gst_srt_object_close (GstSRTObject * srtobject)
srtobject->listener_sock = SRT_INVALID_SOCK;
}
g_list_foreach (srtobject->callers, (GFunc) srt_caller_invoke_removed_closure,
srtobject);
g_list_free_full (srtobject->callers, (GDestroyNotify) srt_caller_free);
if (srtobject->callers) {
GList *callers = g_steal_pointer (&srtobject->callers);
GST_OBJECT_UNLOCK (srtobject->element);
g_list_foreach (callers, (GFunc) srt_caller_invoke_removed_closure,
srtobject);
GST_OBJECT_LOCK (srtobject->element);
g_list_free_full (callers, (GDestroyNotify) srt_caller_free);
}
g_clear_pointer (&srtobject->caller_added_closure, g_closure_unref);
g_clear_pointer (&srtobject->caller_removed_closure, g_closure_unref);
srtobject->opened = FALSE;
GST_OBJECT_UNLOCK (srtobject->element);
}
static gboolean
@ -1076,7 +1066,7 @@ gst_srt_object_wait_caller (GstSRTObject * srtobject,
GST_OBJECT_LOCK (srtobject->element);
while (!g_cancellable_is_cancelled (cancellable)) {
ret = g_list_length (srtobject->callers) >= 1;
ret = (srtobject->callers != NULL);
if (ret)
break;
g_cond_wait (&srtobject->sock_cond,
@ -1096,7 +1086,7 @@ gst_srt_object_read (GstSRTObject * srtobject,
gssize len = 0;
gint poll_timeout;
GstSRTConnectionMode connection_mode = GST_SRT_CONNECTION_MODE_NONE;
gint poll_id;
gint poll_id = SRT_ERROR;
/* Only source element can read data */
g_return_val_if_fail (gst_uri_handler_get_uri_type (GST_URI_HANDLER
@ -1111,9 +1101,13 @@ gst_srt_object_read (GstSRTObject * srtobject,
if (!gst_srt_object_wait_caller (srtobject, cancellable, error))
return -1;
GST_OBJECT_LOCK (srtobject->element);
caller = srtobject->callers->data;
poll_id = caller->poll_id;
if (srtobject->callers)
poll_id = caller->poll_id;
GST_OBJECT_UNLOCK (srtobject->element);
if (poll_id == SRT_ERROR)
return 0;
} else {
poll_id = srtobject->poll_id;
}
@ -1127,9 +1121,16 @@ gst_srt_object_read (GstSRTObject * srtobject,
SRTSOCKET rsock;
gint rsocklen = 1;
int pollret;
if (srt_epoll_wait (poll_id, &rsock,
&rsocklen, 0, 0, poll_timeout, NULL, 0, NULL, 0) < 0) {
pollret = srt_epoll_wait (poll_id, &rsock,
&rsocklen, 0, 0, poll_timeout, NULL, 0, NULL, 0);
if (pollret < 0) {
gint srt_errno = srt_getlasterror (NULL);
if (srt_errno != SRT_ETIMEOUT) {
return 0;
}
continue;
}
@ -1288,9 +1289,7 @@ gst_srt_object_write_to_callers (GstSRTObject * srtobject,
err:
srtobject->callers = g_list_remove (srtobject->callers, caller);
srt_caller_invoke_removed_closure (caller, srtobject);
GST_OBJECT_UNLOCK (srtobject->element);
srt_caller_free (caller);
GST_OBJECT_LOCK (srtobject->element);
}
GST_OBJECT_UNLOCK (srtobject->element);

View file

@ -61,9 +61,6 @@ struct _GstSRTObject
SRTSOCKET listener_sock;
gint listener_poll_id;
GMainLoop *loop;
GMainContext *context;
GSource *listener_source;
GThread *thread;
GList *callers;