diff --git a/gst/rtmp2/gstrtmp2sink.c b/gst/rtmp2/gstrtmp2sink.c index 30b51d7584..72026eb875 100644 --- a/gst/rtmp2/gstrtmp2sink.c +++ b/gst/rtmp2/gstrtmp2sink.c @@ -1216,18 +1216,7 @@ gst_rtmp2_sink_get_stats (GstRtmp2Sink * self) g_mutex_lock (&self->lock); if (self->connection) { - GstRtmpConnection *connection = g_object_ref (self->connection); - - g_mutex_unlock (&self->lock); - - /* We need to do this without holding the lock as the g_async_queue_pop - * waits on the loop thread to deliver the stats. The loop thread might - * attempt to take the lock as well, leading to a deadlock. */ - s = gst_rtmp_connection_get_stats (connection); - - g_mutex_lock (&self->lock); - - g_object_unref (connection); + s = gst_rtmp_connection_get_stats (self->connection); } else if (self->stats) { s = gst_structure_copy (self->stats); } else { diff --git a/gst/rtmp2/gstrtmp2src.c b/gst/rtmp2/gstrtmp2src.c index 2b7a05f98a..f5c356b11e 100644 --- a/gst/rtmp2/gstrtmp2src.c +++ b/gst/rtmp2/gstrtmp2src.c @@ -1008,18 +1008,7 @@ gst_rtmp2_src_get_stats (GstRtmp2Src * self) g_mutex_lock (&self->lock); if (self->connection) { - GstRtmpConnection *connection = g_object_ref (self->connection); - - g_mutex_unlock (&self->lock); - - /* We need to do this without holding the lock as the g_async_queue_pop - * waits on the loop thread to deliver the stats. The loop thread might - * attempt to take the lock as well, leading to a deadlock. */ - s = gst_rtmp_connection_get_stats (connection); - - g_mutex_lock (&self->lock); - - g_object_unref (connection); + s = gst_rtmp_connection_get_stats (self->connection); } else if (self->stats) { s = gst_structure_copy (self->stats); } else { diff --git a/gst/rtmp2/rtmp/rtmpconnection.c b/gst/rtmp2/rtmp/rtmpconnection.c index b9fe7675a7..22d1e69ebf 100644 --- a/gst/rtmp2/rtmp/rtmpconnection.c +++ b/gst/rtmp2/rtmp/rtmpconnection.c @@ -52,7 +52,7 @@ struct _GstRtmpConnection GSocketConnection *connection; GCancellable *cancellable; GSocketClient *socket_client; - GAsyncQueue *output_queue, *stats_queue; + GAsyncQueue *output_queue; GMainContext *main_context; GSource *input_source; @@ -73,6 +73,12 @@ struct _GstRtmpConnection gboolean writing; + /* Protects the values below during concurrent access. + * - Taken by the loop thread when writing, but not reading. + * - Taken by other threads when reading (calling get_stats). + */ + GMutex stats_lock; + /* RTMP configuration */ guint32 in_chunk_size; guint32 out_chunk_size, out_chunk_size_pending; @@ -248,8 +254,6 @@ gst_rtmp_connection_init (GstRtmpConnection * rtmpconnection) rtmpconnection->cancellable = g_cancellable_new (); rtmpconnection->output_queue = g_async_queue_new_full ((GDestroyNotify) gst_buffer_unref); - rtmpconnection->stats_queue = - g_async_queue_new_full ((GDestroyNotify) gst_structure_free); rtmpconnection->input_streams = gst_rtmp_chunk_streams_new (); rtmpconnection->output_streams = gst_rtmp_chunk_streams_new (); @@ -258,6 +262,8 @@ gst_rtmp_connection_init (GstRtmpConnection * rtmpconnection) rtmpconnection->input_bytes = g_byte_array_sized_new (2 * READ_SIZE); rtmpconnection->input_needed_bytes = 1; + + g_mutex_init (&rtmpconnection->stats_lock); } void @@ -284,10 +290,10 @@ gst_rtmp_connection_finalize (GObject * object) /* clean up object here */ + g_mutex_clear (&rtmpconnection->stats_lock); g_clear_object (&rtmpconnection->cancellable); g_clear_object (&rtmpconnection->connection); g_clear_pointer (&rtmpconnection->output_queue, g_async_queue_unref); - g_clear_pointer (&rtmpconnection->stats_queue, g_async_queue_unref); g_clear_pointer (&rtmpconnection->input_streams, gst_rtmp_chunk_streams_free); g_clear_pointer (&rtmpconnection->output_streams, gst_rtmp_chunk_streams_free); @@ -468,7 +474,10 @@ gst_rtmp_connection_input_ready (GInputStream * is, gpointer user_data) GST_TRACE_OBJECT (sc, "read %" G_GSIZE_FORMAT " bytes", ret); + g_mutex_lock (&sc->stats_lock); sc->in_bytes_total += ret; + g_mutex_unlock (&sc->stats_lock); + bytes_since_ack = sc->in_bytes_total - sc->in_bytes_acked; if (sc->in_window_ack_size && bytes_since_ack >= sc->in_window_ack_size) { gst_rtmp_connection_send_ack (sc); @@ -569,7 +578,9 @@ gst_rtmp_connection_write_buffer_done (GObject * obj, res = gst_rtmp_output_stream_write_all_buffer_finish (os, result, &bytes_written, &error); + g_mutex_lock (&self->stats_lock); self->out_bytes_total += bytes_written; + g_mutex_unlock (&self->stats_lock); if (!res) { if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED)) { @@ -923,7 +934,9 @@ gst_rtmp_connection_handle_set_chunk_size (GstRtmpConnection * self, "peer requested small chunk size %" G_GUINT32_FORMAT, chunk_size); } + g_mutex_lock (&self->stats_lock); self->in_chunk_size = chunk_size; + g_mutex_unlock (&self->stats_lock); } static void @@ -948,7 +961,9 @@ gst_rtmp_connection_handle_ack (GstRtmpConnection * self, guint32 bytes) GST_LOG_OBJECT (self, "Peer acknowledged %" G_GUINT64_FORMAT " bytes", new_ack - last_ack); + g_mutex_lock (&self->stats_lock); self->out_bytes_acked = new_ack; + g_mutex_unlock (&self->stats_lock); } static void @@ -961,7 +976,9 @@ gst_rtmp_connection_handle_window_ack_size (GstRtmpConnection * self, window_ack_size); } + g_mutex_lock (&self->stats_lock); self->in_window_ack_size = window_ack_size; + g_mutex_unlock (&self->stats_lock); } static gboolean @@ -1173,7 +1190,9 @@ gst_rtmp_connection_send_ack (GstRtmpConnection * connection) gst_rtmp_connection_queue_message (connection, gst_rtmp_message_new_protocol_control (&pc)); + g_mutex_lock (&connection->stats_lock); connection->in_bytes_acked = in_bytes_total; + g_mutex_unlock (&connection->stats_lock); } static void @@ -1301,15 +1320,23 @@ gst_rtmp_connection_apply_protocol_control (GstRtmpConnection * self) chunk_size = self->out_chunk_size_pending; if (chunk_size) { - self->out_chunk_size = chunk_size; self->out_chunk_size_pending = 0; + + g_mutex_lock (&self->stats_lock); + self->out_chunk_size = chunk_size; + g_mutex_unlock (&self->stats_lock); + GST_INFO_OBJECT (self, "applied chunk size %" G_GUINT32_FORMAT, chunk_size); } window_ack_size = self->out_window_ack_size_pending; if (window_ack_size) { - self->out_window_ack_size = window_ack_size; self->out_window_ack_size_pending = 0; + + g_mutex_lock (&self->stats_lock); + self->out_window_ack_size = window_ack_size; + g_mutex_unlock (&self->stats_lock); + GST_INFO_OBJECT (self, "applied window ack size %" G_GUINT32_FORMAT, window_ack_size); } @@ -1335,14 +1362,6 @@ gst_rtmp_connection_get_null_stats (void) return get_stats (NULL); } -static gboolean -get_stats_invoker (gpointer ptr) -{ - GstRtmpConnection *self = ptr; - g_async_queue_push (self->stats_queue, get_stats (self)); - return G_SOURCE_REMOVE; -} - GstStructure * gst_rtmp_connection_get_stats (GstRtmpConnection * self) { @@ -1350,14 +1369,9 @@ gst_rtmp_connection_get_stats (GstRtmpConnection * self) g_return_val_if_fail (GST_IS_RTMP_CONNECTION (self), NULL); - if (g_main_context_acquire (self->main_context)) { - s = get_stats (self); - g_main_context_release (self->main_context); - } else { - g_main_context_invoke_full (self->main_context, G_PRIORITY_HIGH, - get_stats_invoker, g_object_ref (self), g_object_unref); - s = g_async_queue_pop (self->stats_queue); - } + g_mutex_lock (&self->stats_lock); + s = get_stats (self); + g_mutex_unlock (&self->stats_lock); return s; }