rtmp2/connection: Separate inner from outer cancelling

The connection cancels itself when it is closed. To avoid the
cancellable passed to `gst_rtmp_connection_new` from being unexpectedly
cancelled, separate inner from outer cancellation by holding two
cancellables.

Fixes https://gitlab.freedesktop.org/gstreamer/gst-plugins-bad/-/issues/1558

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-bad/-/merge_requests/2111>
This commit is contained in:
Jan Alexander Steffens (heftig) 2021-03-26 12:20:07 +01:00 committed by GStreamer Marge Bot
parent f369ca1b32
commit 0b916e7cec

View file

@ -55,6 +55,9 @@ struct _GstRtmpConnection
GAsyncQueue *output_queue; GAsyncQueue *output_queue;
GMainContext *main_context; GMainContext *main_context;
GCancellable *outer_cancellable;
gulong cancel_handler_id;
GSource *input_source; GSource *input_source;
GByteArray *input_bytes; GByteArray *input_bytes;
guint input_needed_bytes; guint input_needed_bytes;
@ -101,6 +104,8 @@ typedef struct
static void gst_rtmp_connection_dispose (GObject * object); static void gst_rtmp_connection_dispose (GObject * object);
static void gst_rtmp_connection_finalize (GObject * object); static void gst_rtmp_connection_finalize (GObject * object);
static void gst_rtmp_connection_set_cancellable (GstRtmpConnection * self,
GCancellable * cancellable);
static void gst_rtmp_connection_emit_error (GstRtmpConnection * self); static void gst_rtmp_connection_emit_error (GstRtmpConnection * self);
static gboolean gst_rtmp_connection_input_ready (GInputStream * is, static gboolean gst_rtmp_connection_input_ready (GInputStream * is,
gpointer user_data); gpointer user_data);
@ -251,6 +256,7 @@ gst_rtmp_connection_class_init (GstRtmpConnectionClass * klass)
static void static void
gst_rtmp_connection_init (GstRtmpConnection * rtmpconnection) gst_rtmp_connection_init (GstRtmpConnection * rtmpconnection)
{ {
rtmpconnection->cancellable = g_cancellable_new ();
rtmpconnection->output_queue = rtmpconnection->output_queue =
g_async_queue_new_full ((GDestroyNotify) gst_buffer_unref); g_async_queue_new_full ((GDestroyNotify) gst_buffer_unref);
rtmpconnection->input_streams = gst_rtmp_chunk_streams_new (); rtmpconnection->input_streams = gst_rtmp_chunk_streams_new ();
@ -277,6 +283,7 @@ gst_rtmp_connection_dispose (GObject * object)
g_cancellable_cancel (rtmpconnection->cancellable); g_cancellable_cancel (rtmpconnection->cancellable);
gst_rtmp_connection_set_input_handler (rtmpconnection, NULL, NULL, NULL); gst_rtmp_connection_set_input_handler (rtmpconnection, NULL, NULL, NULL);
gst_rtmp_connection_set_output_handler (rtmpconnection, NULL, NULL, NULL); gst_rtmp_connection_set_output_handler (rtmpconnection, NULL, NULL, NULL);
gst_rtmp_connection_set_cancellable (rtmpconnection, NULL);
G_OBJECT_CLASS (gst_rtmp_connection_parent_class)->dispose (object); G_OBJECT_CLASS (gst_rtmp_connection_parent_class)->dispose (object);
} }
@ -332,6 +339,24 @@ gst_rtmp_connection_set_socket_connection (GstRtmpConnection * sc,
g_source_attach (sc->input_source, sc->main_context); g_source_attach (sc->input_source, sc->main_context);
} }
static void
gst_rtmp_connection_set_cancellable (GstRtmpConnection * self,
GCancellable * cancellable)
{
g_cancellable_disconnect (self->outer_cancellable, self->cancel_handler_id);
g_clear_object (&self->outer_cancellable);
self->cancel_handler_id = 0;
if (cancellable == NULL)
return;
self->outer_cancellable = g_object_ref (cancellable);
self->cancel_handler_id =
g_cancellable_connect (cancellable, G_CALLBACK (g_cancellable_cancel),
g_object_ref (self->cancellable), g_object_unref);
}
GstRtmpConnection * GstRtmpConnection *
gst_rtmp_connection_new (GSocketConnection * connection, gst_rtmp_connection_new (GSocketConnection * connection,
GCancellable * cancellable) GCancellable * cancellable)
@ -339,12 +364,9 @@ gst_rtmp_connection_new (GSocketConnection * connection,
GstRtmpConnection *sc; GstRtmpConnection *sc;
sc = g_object_new (GST_TYPE_RTMP_CONNECTION, NULL); sc = g_object_new (GST_TYPE_RTMP_CONNECTION, NULL);
if (cancellable)
sc->cancellable = g_object_ref (cancellable);
else
sc->cancellable = g_cancellable_new ();
gst_rtmp_connection_set_socket_connection (sc, connection); gst_rtmp_connection_set_socket_connection (sc, connection);
gst_rtmp_connection_set_cancellable (sc, cancellable);
return sc; return sc;
} }