rtmp2: Replace stats queue with stats lock

Making the thread receiving the stats wait on the loop to respond was
not a good idea, as the latter can get blocked on the streaming thread.

Have get_stats read the values directly, adding a lock to ensure we
don't read garbage.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-bad/-/merge_requests/1550>
This commit is contained in:
Jan Alexander Steffens (heftig) 2020-09-01 13:28:44 +02:00 committed by GStreamer Merge Bot
parent ebc057bb7a
commit 16a07d303a
3 changed files with 38 additions and 46 deletions

View file

@ -1216,18 +1216,7 @@ gst_rtmp2_sink_get_stats (GstRtmp2Sink * self)
g_mutex_lock (&self->lock);
if (self->connection) {
GstRtmpConnection *connection = g_object_ref (self->connection);
g_mutex_unlock (&self->lock);
/* We need to do this without holding the lock as the g_async_queue_pop
* waits on the loop thread to deliver the stats. The loop thread might
* attempt to take the lock as well, leading to a deadlock. */
s = gst_rtmp_connection_get_stats (connection);
g_mutex_lock (&self->lock);
g_object_unref (connection);
s = gst_rtmp_connection_get_stats (self->connection);
} else if (self->stats) {
s = gst_structure_copy (self->stats);
} else {

View file

@ -1008,18 +1008,7 @@ gst_rtmp2_src_get_stats (GstRtmp2Src * self)
g_mutex_lock (&self->lock);
if (self->connection) {
GstRtmpConnection *connection = g_object_ref (self->connection);
g_mutex_unlock (&self->lock);
/* We need to do this without holding the lock as the g_async_queue_pop
* waits on the loop thread to deliver the stats. The loop thread might
* attempt to take the lock as well, leading to a deadlock. */
s = gst_rtmp_connection_get_stats (connection);
g_mutex_lock (&self->lock);
g_object_unref (connection);
s = gst_rtmp_connection_get_stats (self->connection);
} else if (self->stats) {
s = gst_structure_copy (self->stats);
} else {

View file

@ -52,7 +52,7 @@ struct _GstRtmpConnection
GSocketConnection *connection;
GCancellable *cancellable;
GSocketClient *socket_client;
GAsyncQueue *output_queue, *stats_queue;
GAsyncQueue *output_queue;
GMainContext *main_context;
GSource *input_source;
@ -73,6 +73,12 @@ struct _GstRtmpConnection
gboolean writing;
/* Protects the values below during concurrent access.
* - Taken by the loop thread when writing, but not reading.
* - Taken by other threads when reading (calling get_stats).
*/
GMutex stats_lock;
/* RTMP configuration */
guint32 in_chunk_size;
guint32 out_chunk_size, out_chunk_size_pending;
@ -248,8 +254,6 @@ gst_rtmp_connection_init (GstRtmpConnection * rtmpconnection)
rtmpconnection->cancellable = g_cancellable_new ();
rtmpconnection->output_queue =
g_async_queue_new_full ((GDestroyNotify) gst_buffer_unref);
rtmpconnection->stats_queue =
g_async_queue_new_full ((GDestroyNotify) gst_structure_free);
rtmpconnection->input_streams = gst_rtmp_chunk_streams_new ();
rtmpconnection->output_streams = gst_rtmp_chunk_streams_new ();
@ -258,6 +262,8 @@ gst_rtmp_connection_init (GstRtmpConnection * rtmpconnection)
rtmpconnection->input_bytes = g_byte_array_sized_new (2 * READ_SIZE);
rtmpconnection->input_needed_bytes = 1;
g_mutex_init (&rtmpconnection->stats_lock);
}
void
@ -284,10 +290,10 @@ gst_rtmp_connection_finalize (GObject * object)
/* clean up object here */
g_mutex_clear (&rtmpconnection->stats_lock);
g_clear_object (&rtmpconnection->cancellable);
g_clear_object (&rtmpconnection->connection);
g_clear_pointer (&rtmpconnection->output_queue, g_async_queue_unref);
g_clear_pointer (&rtmpconnection->stats_queue, g_async_queue_unref);
g_clear_pointer (&rtmpconnection->input_streams, gst_rtmp_chunk_streams_free);
g_clear_pointer (&rtmpconnection->output_streams,
gst_rtmp_chunk_streams_free);
@ -468,7 +474,10 @@ gst_rtmp_connection_input_ready (GInputStream * is, gpointer user_data)
GST_TRACE_OBJECT (sc, "read %" G_GSIZE_FORMAT " bytes", ret);
g_mutex_lock (&sc->stats_lock);
sc->in_bytes_total += ret;
g_mutex_unlock (&sc->stats_lock);
bytes_since_ack = sc->in_bytes_total - sc->in_bytes_acked;
if (sc->in_window_ack_size && bytes_since_ack >= sc->in_window_ack_size) {
gst_rtmp_connection_send_ack (sc);
@ -569,7 +578,9 @@ gst_rtmp_connection_write_buffer_done (GObject * obj,
res = gst_rtmp_output_stream_write_all_buffer_finish (os, result,
&bytes_written, &error);
g_mutex_lock (&self->stats_lock);
self->out_bytes_total += bytes_written;
g_mutex_unlock (&self->stats_lock);
if (!res) {
if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
@ -923,7 +934,9 @@ gst_rtmp_connection_handle_set_chunk_size (GstRtmpConnection * self,
"peer requested small chunk size %" G_GUINT32_FORMAT, chunk_size);
}
g_mutex_lock (&self->stats_lock);
self->in_chunk_size = chunk_size;
g_mutex_unlock (&self->stats_lock);
}
static void
@ -948,7 +961,9 @@ gst_rtmp_connection_handle_ack (GstRtmpConnection * self, guint32 bytes)
GST_LOG_OBJECT (self, "Peer acknowledged %" G_GUINT64_FORMAT " bytes",
new_ack - last_ack);
g_mutex_lock (&self->stats_lock);
self->out_bytes_acked = new_ack;
g_mutex_unlock (&self->stats_lock);
}
static void
@ -961,7 +976,9 @@ gst_rtmp_connection_handle_window_ack_size (GstRtmpConnection * self,
window_ack_size);
}
g_mutex_lock (&self->stats_lock);
self->in_window_ack_size = window_ack_size;
g_mutex_unlock (&self->stats_lock);
}
static gboolean
@ -1173,7 +1190,9 @@ gst_rtmp_connection_send_ack (GstRtmpConnection * connection)
gst_rtmp_connection_queue_message (connection,
gst_rtmp_message_new_protocol_control (&pc));
g_mutex_lock (&connection->stats_lock);
connection->in_bytes_acked = in_bytes_total;
g_mutex_unlock (&connection->stats_lock);
}
static void
@ -1301,15 +1320,23 @@ gst_rtmp_connection_apply_protocol_control (GstRtmpConnection * self)
chunk_size = self->out_chunk_size_pending;
if (chunk_size) {
self->out_chunk_size = chunk_size;
self->out_chunk_size_pending = 0;
g_mutex_lock (&self->stats_lock);
self->out_chunk_size = chunk_size;
g_mutex_unlock (&self->stats_lock);
GST_INFO_OBJECT (self, "applied chunk size %" G_GUINT32_FORMAT, chunk_size);
}
window_ack_size = self->out_window_ack_size_pending;
if (window_ack_size) {
self->out_window_ack_size = window_ack_size;
self->out_window_ack_size_pending = 0;
g_mutex_lock (&self->stats_lock);
self->out_window_ack_size = window_ack_size;
g_mutex_unlock (&self->stats_lock);
GST_INFO_OBJECT (self, "applied window ack size %" G_GUINT32_FORMAT,
window_ack_size);
}
@ -1335,14 +1362,6 @@ gst_rtmp_connection_get_null_stats (void)
return get_stats (NULL);
}
static gboolean
get_stats_invoker (gpointer ptr)
{
GstRtmpConnection *self = ptr;
g_async_queue_push (self->stats_queue, get_stats (self));
return G_SOURCE_REMOVE;
}
GstStructure *
gst_rtmp_connection_get_stats (GstRtmpConnection * self)
{
@ -1350,14 +1369,9 @@ gst_rtmp_connection_get_stats (GstRtmpConnection * self)
g_return_val_if_fail (GST_IS_RTMP_CONNECTION (self), NULL);
if (g_main_context_acquire (self->main_context)) {
s = get_stats (self);
g_main_context_release (self->main_context);
} else {
g_main_context_invoke_full (self->main_context, G_PRIORITY_HIGH,
get_stats_invoker, g_object_ref (self), g_object_unref);
s = g_async_queue_pop (self->stats_queue);
}
g_mutex_lock (&self->stats_lock);
s = get_stats (self);
g_mutex_unlock (&self->stats_lock);
return s;
}