srt: Move cancellable into srtobject

Should produce no difference in behavior.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/4087>
This commit is contained in:
Jan Alexander Steffens (heftig) 2023-03-01 16:00:39 -05:00 committed by GStreamer Marge Bot
parent 4cede7c472
commit 3f75836822
6 changed files with 74 additions and 88 deletions

View file

@ -192,7 +192,7 @@ static gint srt_init_refcount = 0;
static GSocketAddress *
gst_srt_object_resolve (GstSRTObject * srtobject, const gchar * address,
guint port, GCancellable * cancellable, GError ** err_out)
guint port, GError ** err_out)
{
GError *err = NULL;
GSocketAddress *saddr;
@ -205,7 +205,9 @@ gst_srt_object_resolve (GstSRTObject * srtobject, const gchar * address,
GST_DEBUG_OBJECT (srtobject->element, "resolving IP address for host %s",
address);
resolver = g_resolver_get_default ();
results = g_resolver_lookup_by_name (resolver, address, cancellable, &err);
results =
g_resolver_lookup_by_name (resolver, address, srtobject->cancellable,
&err);
if (!results)
goto name_resolve;
@ -347,6 +349,7 @@ gst_srt_object_new (GstElement * element)
srtobject = g_new0 (GstSRTObject, 1);
srtobject->element = element;
srtobject->cancellable = g_cancellable_new ();
srtobject->parameters = gst_structure_new_empty ("application/x-srt-params");
srtobject->sock = SRT_INVALID_SOCK;
srtobject->poll_id = srt_epoll_create ();
@ -382,6 +385,7 @@ gst_srt_object_destroy (GstSRTObject * srtobject)
}
g_clear_pointer (&srtobject->uri, gst_uri_unref);
g_clear_object (&srtobject->cancellable);
g_free (srtobject);
}
@ -1069,8 +1073,8 @@ reject:
}
static gboolean
gst_srt_object_wait_connect (GstSRTObject * srtobject,
GCancellable * cancellable, gpointer sa, size_t sa_len, GError ** error)
gst_srt_object_wait_connect (GstSRTObject * srtobject, gpointer sa,
size_t sa_len, GError ** error)
{
SRTSOCKET sock = SRT_INVALID_SOCK;
const gchar *local_address = NULL;
@ -1093,8 +1097,7 @@ gst_srt_object_wait_connect (GstSRTObject * srtobject,
GST_OBJECT_UNLOCK (srtobject->element);
bind_addr =
gst_srt_object_resolve (srtobject, local_address, local_port, cancellable,
error);
gst_srt_object_resolve (srtobject, local_address, local_port, error);
if (!bind_addr) {
goto failed;
}
@ -1180,7 +1183,7 @@ failed:
}
static gboolean
gst_srt_object_connect (GstSRTObject * srtobject, GCancellable * cancellable,
gst_srt_object_connect (GstSRTObject * srtobject,
GstSRTConnectionMode connection_mode, gpointer sa, size_t sa_len,
GError ** error)
{
@ -1242,8 +1245,7 @@ gst_srt_object_connect (GstSRTObject * srtobject, GCancellable * cancellable,
gsize bind_sa_len;
GSocketAddress *bind_addr =
gst_srt_object_resolve (srtobject, local_address,
local_port, cancellable, error);
gst_srt_object_resolve (srtobject, local_address, local_port, error);
if (!bind_addr) {
goto failed;
@ -1294,26 +1296,23 @@ failed:
static gboolean
gst_srt_object_open_connection (GstSRTObject * srtobject,
GCancellable * cancellable, GstSRTConnectionMode connection_mode,
gpointer sa, size_t sa_len, GError ** error)
GstSRTConnectionMode connection_mode, gpointer sa, size_t sa_len,
GError ** error)
{
gboolean ret = FALSE;
if (connection_mode == GST_SRT_CONNECTION_MODE_LISTENER) {
ret =
gst_srt_object_wait_connect (srtobject, cancellable, sa, sa_len, error);
ret = gst_srt_object_wait_connect (srtobject, sa, sa_len, error);
} else {
ret =
gst_srt_object_connect (srtobject, cancellable, connection_mode, sa,
sa_len, error);
gst_srt_object_connect (srtobject, connection_mode, sa, sa_len, error);
}
return ret;
}
static gboolean
gst_srt_object_open_internal (GstSRTObject * srtobject,
GCancellable * cancellable, GError ** error)
gst_srt_object_open_internal (GstSRTObject * srtobject, GError ** error)
{
GSocketAddress *socket_address = NULL;
GstSRTConnectionMode connection_mode;
@ -1352,8 +1351,7 @@ gst_srt_object_open_internal (GstSRTObject * srtobject,
GST_OBJECT_UNLOCK (srtobject->element);
socket_address =
gst_srt_object_resolve (srtobject, addr_str, port, cancellable, error);
socket_address = gst_srt_object_resolve (srtobject, addr_str, port, error);
if (socket_address == NULL) {
goto out;
}
@ -1368,8 +1366,8 @@ gst_srt_object_open_internal (GstSRTObject * srtobject,
srtobject->listener_poll_id = srt_epoll_create ();
ret =
gst_srt_object_open_connection
(srtobject, cancellable, connection_mode, sa, sa_len, error);
gst_srt_object_open_connection (srtobject, connection_mode, sa, sa_len,
error);
GST_OBJECT_LOCK (srtobject->element);
srtobject->opened = ret;
@ -1382,12 +1380,11 @@ out:
}
gboolean
gst_srt_object_open (GstSRTObject * srtobject, GCancellable * cancellable,
GError ** error)
gst_srt_object_open (GstSRTObject * srtobject, GError ** error)
{
srtobject->bytes = 0;
return gst_srt_object_open_internal (srtobject, cancellable, error);
return gst_srt_object_open_internal (srtobject, error);
}
void
@ -1443,8 +1440,7 @@ gst_srt_object_close (GstSRTObject * srtobject)
}
static gboolean
gst_srt_object_wait_caller (GstSRTObject * srtobject,
GCancellable * cancellable)
gst_srt_object_wait_caller (GstSRTObject * srtobject)
{
gboolean ret;
@ -1453,7 +1449,7 @@ gst_srt_object_wait_caller (GstSRTObject * srtobject,
ret = (srtobject->callers != NULL);
if (!ret) {
GST_INFO_OBJECT (srtobject->element, "Waiting for connection");
while (!ret && !g_cancellable_is_cancelled (cancellable)) {
while (!ret && !g_cancellable_is_cancelled (srtobject->cancellable)) {
g_cond_wait (&srtobject->sock_cond, &srtobject->sock_lock);
ret = (srtobject->callers != NULL);
}
@ -1469,9 +1465,8 @@ gst_srt_object_wait_caller (GstSRTObject * srtobject,
}
gssize
gst_srt_object_read (GstSRTObject * srtobject,
guint8 * data, gsize size, GCancellable * cancellable, GError ** error,
SRT_MSGCTRL * mctrl)
gst_srt_object_read (GstSRTObject * srtobject, guint8 * data, gsize size,
GError ** error, SRT_MSGCTRL * mctrl)
{
gssize len = 0;
gint poll_timeout;
@ -1498,7 +1493,7 @@ gst_srt_object_read (GstSRTObject * srtobject,
GST_OBJECT_UNLOCK (srtobject->element);
if (connection_mode == GST_SRT_CONNECTION_MODE_LISTENER) {
if (!gst_srt_object_wait_caller (srtobject, cancellable))
if (!gst_srt_object_wait_caller (srtobject))
return 0;
g_mutex_lock (&srtobject->sock_lock);
@ -1514,7 +1509,7 @@ gst_srt_object_read (GstSRTObject * srtobject,
poll_id = srtobject->poll_id;
}
while (!g_cancellable_is_cancelled (cancellable)) {
while (!g_cancellable_is_cancelled (srtobject->cancellable)) {
SRTSOCKET rsock;
gint rsocklen = 1;
@ -1526,7 +1521,8 @@ gst_srt_object_read (GstSRTObject * srtobject,
gint srt_errno = srt_getlasterror (NULL);
#if SRT_VERSION_VALUE >= 0x010402
if (srt_errno == SRT_EPOLLEMPTY && g_cancellable_is_cancelled (cancellable))
if (srt_errno == SRT_EPOLLEMPTY
&& g_cancellable_is_cancelled (srtobject->cancellable))
return 0;
#endif
@ -1578,7 +1574,7 @@ gst_srt_object_read (GstSRTObject * srtobject,
}
gst_srt_object_close (srtobject);
if (!gst_srt_object_open_internal (srtobject, cancellable, error)) {
if (!gst_srt_object_open_internal (srtobject, error)) {
return -1;
}
continue;
@ -1607,7 +1603,7 @@ gst_srt_object_read (GstSRTObject * srtobject,
}
void
gst_srt_object_wakeup (GstSRTObject * srtobject, GCancellable * cancellable)
gst_srt_object_unlock (GstSRTObject * srtobject)
{
GST_DEBUG_OBJECT (srtobject->element, "waking up SRT");
@ -1623,15 +1619,20 @@ gst_srt_object_wakeup (GstSRTObject * srtobject, GCancellable * cancellable)
/* however, a race might be harmful ...
* the cancellation is used as 'flushing' flag here,
* so make sure it is so detected by the intended part at proper time */
g_cancellable_cancel (cancellable);
g_cancellable_cancel (srtobject->cancellable);
g_cond_signal (&srtobject->sock_cond);
g_mutex_unlock (&srtobject->sock_lock);
}
void
gst_srt_object_unlock_stop (GstSRTObject * srtobject)
{
g_cancellable_reset (srtobject->cancellable);
}
static gboolean
gst_srt_object_send_headers (GstSRTObject * srtobject, SRTSOCKET sock,
gint poll_id, gint poll_timeout, GstBufferList * headers,
GCancellable * cancellable, GError ** error)
gint poll_id, gint poll_timeout, GstBufferList * headers, GError ** error)
{
guint size, i;
@ -1650,7 +1651,7 @@ gst_srt_object_send_headers (GstSRTObject * srtobject, SRTSOCKET sock,
GstBuffer *buffer = gst_buffer_list_get (headers, i);
GstMapInfo mapinfo;
if (g_cancellable_is_cancelled (cancellable)) {
if (g_cancellable_is_cancelled (srtobject->cancellable)) {
return TRUE;
}
@ -1659,7 +1660,8 @@ gst_srt_object_send_headers (GstSRTObject * srtobject, SRTSOCKET sock,
gint srt_errno = srt_getlasterror (NULL);
#if SRT_VERSION_VALUE >= 0x010402
if (srt_errno == SRT_EPOLLEMPTY && g_cancellable_is_cancelled (cancellable))
if (srt_errno == SRT_EPOLLEMPTY
&& g_cancellable_is_cancelled (srtobject->cancellable))
return TRUE;
#endif
@ -1698,8 +1700,7 @@ gst_srt_object_send_headers (GstSRTObject * srtobject, SRTSOCKET sock,
static gssize
gst_srt_object_write_to_callers (GstSRTObject * srtobject,
GstBufferList * headers,
const GstMapInfo * mapinfo, GCancellable * cancellable)
GstBufferList * headers, const GstMapInfo * mapinfo)
{
GList *item, *next;
@ -1713,18 +1714,18 @@ gst_srt_object_write_to_callers (GstSRTObject * srtobject,
next = item->next;
if (g_cancellable_is_cancelled (cancellable)) {
if (g_cancellable_is_cancelled (srtobject->cancellable)) {
goto cancelled;
}
if (!caller->sent_headers) {
GError *error = NULL;
if (!gst_srt_object_send_headers (srtobject, caller->sock, -1, 0,
headers, cancellable, &error)) {
if (!gst_srt_object_send_headers (srtobject, caller->sock, -1, 0, headers,
&error)) {
GST_WARNING_OBJECT (srtobject->element,
"Failed to send headers to caller %d: %s",
caller->sock, error->message);
"Failed to send headers to caller %d: %s", caller->sock,
error->message);
g_error_free (error);
goto err;
}
@ -1767,9 +1768,8 @@ cancelled:
}
static gssize
gst_srt_object_write_one (GstSRTObject * srtobject,
GstBufferList * headers,
const GstMapInfo * mapinfo, GCancellable * cancellable, GError ** error)
gst_srt_object_write_one (GstSRTObject * srtobject, GstBufferList * headers,
const GstMapInfo * mapinfo, GError ** error)
{
gssize len = 0;
gint poll_timeout;
@ -1789,7 +1789,7 @@ gst_srt_object_write_one (GstSRTObject * srtobject,
if (!srtobject->sent_headers) {
if (!gst_srt_object_send_headers (srtobject, srtobject->sock,
srtobject->poll_id, poll_timeout, headers, cancellable, error)) {
srtobject->poll_id, poll_timeout, headers, error)) {
return -1;
}
@ -1805,7 +1805,7 @@ gst_srt_object_write_one (GstSRTObject * srtobject,
gint sent;
gint rest;
if (g_cancellable_is_cancelled (cancellable)) {
if (g_cancellable_is_cancelled (srtobject->cancellable)) {
break;
}
@ -1821,7 +1821,8 @@ gst_srt_object_write_one (GstSRTObject * srtobject,
gint srt_errno = srt_getlasterror (NULL);
#if SRT_VERSION_VALUE >= 0x010402
if (srt_errno == SRT_EPOLLEMPTY && g_cancellable_is_cancelled (cancellable))
if (srt_errno == SRT_EPOLLEMPTY
&& g_cancellable_is_cancelled (srtobject->cancellable))
return 0;
#endif
@ -1861,7 +1862,7 @@ gst_srt_object_write_one (GstSRTObject * srtobject,
}
gst_srt_object_close (srtobject);
if (!gst_srt_object_open_internal (srtobject, cancellable, error)) {
if (!gst_srt_object_open_internal (srtobject, error)) {
return -1;
}
continue;
@ -1889,9 +1890,8 @@ gst_srt_object_write_one (GstSRTObject * srtobject,
}
gssize
gst_srt_object_write (GstSRTObject * srtobject,
GstBufferList * headers,
const GstMapInfo * mapinfo, GCancellable * cancellable, GError ** error)
gst_srt_object_write (GstSRTObject * srtobject, GstBufferList * headers,
const GstMapInfo * mapinfo, GError ** error)
{
gssize len = 0;
GstSRTConnectionMode connection_mode = GST_SRT_CONNECTION_MODE_NONE;
@ -1909,16 +1909,12 @@ gst_srt_object_write (GstSRTObject * srtobject,
if (connection_mode == GST_SRT_CONNECTION_MODE_LISTENER) {
if (wait_for_connection) {
if (!gst_srt_object_wait_caller (srtobject, cancellable))
if (!gst_srt_object_wait_caller (srtobject))
return 0;
}
len =
gst_srt_object_write_to_callers (srtobject, headers, mapinfo,
cancellable);
len = gst_srt_object_write_to_callers (srtobject, headers, mapinfo);
} else {
len =
gst_srt_object_write_one (srtobject, headers, mapinfo, cancellable,
error);
len = gst_srt_object_write_one (srtobject, headers, mapinfo, error);
}
return len;

View file

@ -49,6 +49,7 @@ typedef struct _GstSRTObject GstSRTObject;
struct _GstSRTObject
{
GstElement *element;
GCancellable *cancellable;
GstUri *uri;
GstStructure *parameters;
@ -82,7 +83,6 @@ GstSRTObject *gst_srt_object_new (GstElement *element);
void gst_srt_object_destroy (GstSRTObject *srtobject);
gboolean gst_srt_object_open (GstSRTObject *srtobject,
GCancellable *cancellable,
GError **error);
void gst_srt_object_close (GstSRTObject *srtobject);
@ -101,18 +101,16 @@ gboolean gst_srt_object_set_uri (GstSRTObject * srtobject, const gchar *u
gssize gst_srt_object_read (GstSRTObject * srtobject,
guint8 *data, gsize size,
GCancellable *cancellable,
GError **err,
SRT_MSGCTRL *mctrl);
gssize gst_srt_object_write (GstSRTObject * srtobject,
GstBufferList * headers,
const GstMapInfo * mapinfo,
GCancellable *cancellable,
GError **err);
void gst_srt_object_wakeup (GstSRTObject * srtobject,
GCancellable *cancellable);
void gst_srt_object_unlock (GstSRTObject * srtobject);
void gst_srt_object_unlock_stop (GstSRTObject * srtobject);
GstStructure *gst_srt_object_get_stats (GstSRTObject * srtobject);

View file

@ -130,7 +130,6 @@ gst_srt_sink_finalize (GObject * object)
{
GstSRTSink *self = GST_SRT_SINK (object);
g_clear_object (&self->cancellable);
gst_srt_object_destroy (self->srtobject);
G_OBJECT_CLASS (parent_class)->finalize (object);
@ -140,8 +139,6 @@ static void
gst_srt_sink_init (GstSRTSink * self)
{
self->srtobject = gst_srt_object_new (GST_ELEMENT (self));
self->cancellable = g_cancellable_new ();
gst_srt_object_set_uri (self->srtobject, GST_SRT_DEFAULT_URI, NULL);
}
@ -153,7 +150,7 @@ gst_srt_sink_start (GstBaseSink * bsink)
GError *error = NULL;
gboolean ret = FALSE;
ret = gst_srt_object_open (self->srtobject, self->cancellable, &error);
ret = gst_srt_object_open (self->srtobject, &error);
if (!ret) {
/* ensure error is posted since state change will fail */
@ -184,7 +181,7 @@ gst_srt_sink_render (GstBaseSink * sink, GstBuffer * buffer)
GstMapInfo info;
GError *error = NULL;
if (g_cancellable_is_cancelled (self->cancellable)) {
if (g_cancellable_is_cancelled (self->srtobject->cancellable)) {
ret = GST_FLOW_FLUSHING;
}
@ -200,8 +197,7 @@ gst_srt_sink_render (GstBaseSink * sink, GstBuffer * buffer)
return GST_FLOW_ERROR;
}
if (gst_srt_object_write (self->srtobject, self->headers, &info,
self->cancellable, &error) < 0) {
if (gst_srt_object_write (self->srtobject, self->headers, &info, &error) < 0) {
GST_ELEMENT_ERROR (self, RESOURCE, WRITE,
("Failed to write to SRT socket: %s",
error ? error->message : "Unknown error"), (NULL));
@ -229,7 +225,7 @@ gst_srt_sink_unlock (GstBaseSink * bsink)
{
GstSRTSink *self = GST_SRT_SINK (bsink);
gst_srt_object_wakeup (self->srtobject, self->cancellable);
gst_srt_object_unlock (self->srtobject);
return TRUE;
}
@ -239,7 +235,7 @@ gst_srt_sink_unlock_stop (GstBaseSink * bsink)
{
GstSRTSink *self = GST_SRT_SINK (bsink);
g_cancellable_reset (self->cancellable);
gst_srt_object_unlock_stop (self->srtobject);
return TRUE;
}

View file

@ -48,7 +48,6 @@ struct _GstSRTSink {
GstBufferList *headers;
GstSRTObject *srtobject;
GCancellable *cancellable;
};
struct _GstSRTSinkClass {

View file

@ -116,7 +116,7 @@ gst_srt_src_start (GstBaseSrc * bsrc)
GError *error = NULL;
gboolean ret = FALSE;
ret = gst_srt_object_open (self->srtobject, self->cancellable, &error);
ret = gst_srt_object_open (self->srtobject, &error);
if (!ret) {
/* ensure error is posted since state change will fail */
@ -157,7 +157,7 @@ gst_srt_src_fill (GstPushSrc * src, GstBuffer * outbuf)
SRT_MSGCTRL mctrl;
retry:
if (g_cancellable_is_cancelled (self->cancellable)) {
if (g_cancellable_is_cancelled (self->srtobject->cancellable)) {
ret = GST_FLOW_FLUSHING;
}
@ -178,7 +178,7 @@ retry:
base_time = gst_element_get_base_time (GST_ELEMENT (src));
recv_len = gst_srt_object_read (self->srtobject, info.data,
gst_buffer_get_size (outbuf), self->cancellable, &err, &mctrl);
gst_buffer_get_size (outbuf), &err, &mctrl);
/* Capture clock values ASAP */
capture_time = gst_clock_get_time (clock);
@ -197,7 +197,7 @@ retry:
"recv_len:%" G_GSIZE_FORMAT " pktseq:%d msgno:%d srctime:%"
G_GINT64_FORMAT, recv_len, mctrl.pktseq, mctrl.msgno, mctrl.srctime);
if (g_cancellable_is_cancelled (self->cancellable)) {
if (g_cancellable_is_cancelled (self->srtobject->cancellable)) {
ret = GST_FLOW_FLUSHING;
goto out;
}
@ -276,7 +276,6 @@ static void
gst_srt_src_init (GstSRTSrc * self)
{
self->srtobject = gst_srt_object_new (GST_ELEMENT (self));
self->cancellable = g_cancellable_new ();
gst_base_src_set_format (GST_BASE_SRC (self), GST_FORMAT_TIME);
gst_base_src_set_live (GST_BASE_SRC (self), TRUE);
@ -292,7 +291,6 @@ gst_srt_src_finalize (GObject * object)
{
GstSRTSrc *self = GST_SRT_SRC (object);
g_clear_object (&self->cancellable);
gst_srt_object_destroy (self->srtobject);
G_OBJECT_CLASS (parent_class)->finalize (object);
@ -303,7 +301,7 @@ gst_srt_src_unlock (GstBaseSrc * bsrc)
{
GstSRTSrc *self = GST_SRT_SRC (bsrc);
gst_srt_object_wakeup (self->srtobject, self->cancellable);
gst_srt_object_unlock (self->srtobject);
return TRUE;
}
@ -313,7 +311,7 @@ gst_srt_src_unlock_stop (GstBaseSrc * bsrc)
{
GstSRTSrc *self = GST_SRT_SRC (bsrc);
g_cancellable_reset (self->cancellable);
gst_srt_object_unlock_stop (self->srtobject);
return TRUE;
}

View file

@ -48,7 +48,6 @@ struct _GstSRTSrc {
GstCaps *caps;
GstSRTObject *srtobject;
GCancellable *cancellable;
guint32 next_pktseq;
gboolean keep_listening;