From 35165da5861767f9621315167b9473058de25572 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Le=20Page?= Date: Wed, 28 Feb 2024 18:47:58 +0100 Subject: [PATCH] pitch: fix multithread accesses - fully protect accesses to the libsoundtouch API that is not thread-safe. - fully protect accesses to GstPitch members that could be read by a downstream query thread while written by an upstream streaming thread or a user thread. Part-of: --- .../ext/soundtouch/gstpitch.cc | 210 +++++++++++------- 1 file changed, 125 insertions(+), 85 deletions(-) diff --git a/subprojects/gst-plugins-bad/ext/soundtouch/gstpitch.cc b/subprojects/gst-plugins-bad/ext/soundtouch/gstpitch.cc index 85fcad88f5..76834f3707 100644 --- a/subprojects/gst-plugins-bad/ext/soundtouch/gstpitch.cc +++ b/subprojects/gst-plugins-bad/ext/soundtouch/gstpitch.cc @@ -98,6 +98,7 @@ GST_STATIC_PAD_TEMPLATE ("src", GST_STATIC_CAPS (SUPPORTED_CAPS)); static void gst_pitch_dispose (GObject * object); +static void gst_pitch_finalize (GObject * object); static void gst_pitch_set_property (GObject * object, guint prop_id, const GValue * value, GParamSpec * pspec); static void gst_pitch_get_property (GObject * object, @@ -136,6 +137,7 @@ gst_pitch_class_init (GstPitchClass * klass) gobject_class->set_property = gst_pitch_set_property; gobject_class->get_property = gst_pitch_get_property; gobject_class->dispose = GST_DEBUG_FUNCPTR (gst_pitch_dispose); + gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_pitch_finalize); g_object_class_install_property (gobject_class, ARG_PITCH, g_param_spec_float ("pitch", "Pitch", @@ -223,12 +225,23 @@ gst_pitch_dispose (GObject * object) GstPitch *pitch = GST_PITCH (object); GstPitchPrivate *priv = GST_PITCH_GET_PRIVATE (pitch); + gst_clear_event (&priv->pending_segment); + + G_OBJECT_CLASS (parent_class)->dispose (object); +} + +static void +gst_pitch_finalize (GObject * object) +{ + GstPitch *pitch = GST_PITCH (object); + GstPitchPrivate *priv = GST_PITCH_GET_PRIVATE (pitch); + if (priv->st) { delete priv->st; priv->st = NULL; } - G_OBJECT_CLASS (parent_class)->dispose (object); + G_OBJECT_CLASS (parent_class)->finalize (object); } static void @@ -247,9 +260,9 @@ gst_pitch_set_property (GObject * object, guint prop_id, GstPitch *pitch = GST_PITCH (object); GstPitchPrivate *priv = GST_PITCH_GET_PRIVATE (pitch); - GST_OBJECT_LOCK (pitch); switch (prop_id) { case ARG_TEMPO: + GST_OBJECT_LOCK (pitch); pitch->tempo = g_value_get_float (value); priv->stream_time_ratio = pitch->tempo * pitch->rate * pitch->segment_applied_rate; @@ -258,6 +271,7 @@ gst_pitch_set_property (GObject * object, guint prop_id, gst_pitch_update_duration (pitch); break; case ARG_RATE: + GST_OBJECT_LOCK (pitch); pitch->rate = g_value_get_float (value); priv->stream_time_ratio = pitch->tempo * pitch->rate * pitch->segment_applied_rate; @@ -267,17 +281,18 @@ gst_pitch_set_property (GObject * object, guint prop_id, break; case ARG_OUTPUT_RATE: /* Has no effect until the next input segment */ + GST_OBJECT_LOCK (pitch); pitch->output_rate = g_value_get_float (value); GST_OBJECT_UNLOCK (pitch); break; case ARG_PITCH: + GST_OBJECT_LOCK (pitch); pitch->pitch = g_value_get_float (value); priv->st->setPitch (pitch->pitch); GST_OBJECT_UNLOCK (pitch); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); - GST_OBJECT_UNLOCK (pitch); break; } } @@ -288,43 +303,47 @@ gst_pitch_get_property (GObject * object, guint prop_id, { GstPitch *pitch = GST_PITCH (object); - GST_OBJECT_LOCK (pitch); switch (prop_id) { case ARG_TEMPO: + GST_OBJECT_LOCK (pitch); g_value_set_float (value, pitch->tempo); + GST_OBJECT_UNLOCK (pitch); break; case ARG_RATE: + GST_OBJECT_LOCK (pitch); g_value_set_float (value, pitch->rate); + GST_OBJECT_UNLOCK (pitch); break; case ARG_OUTPUT_RATE: + GST_OBJECT_LOCK (pitch); g_value_set_float (value, pitch->output_rate); + GST_OBJECT_UNLOCK (pitch); break; case ARG_PITCH: + GST_OBJECT_LOCK (pitch); g_value_set_float (value, pitch->pitch); + GST_OBJECT_UNLOCK (pitch); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; } - GST_OBJECT_UNLOCK (pitch); } static gboolean gst_pitch_setcaps (GstPitch * pitch, GstCaps * caps) { - GstPitchPrivate *priv; - - priv = GST_PITCH_GET_PRIVATE (pitch); - - if (!gst_audio_info_from_caps (&pitch->info, caps)) + GstAudioInfo info; + if (!gst_audio_info_from_caps (&info, caps)) return FALSE; GST_OBJECT_LOCK (pitch); + pitch->info = info; /* notify the soundtouch instance of this change */ + GstPitchPrivate *priv = GST_PITCH_GET_PRIVATE (pitch); priv->st->setSampleRate (pitch->info.rate); priv->st->setChannels (pitch->info.channels); - GST_OBJECT_UNLOCK (pitch); return TRUE; @@ -336,6 +355,8 @@ gst_pitch_forward_buffer (GstPitch * pitch, GstBuffer * buffer) { gint samples; + GST_OBJECT_LOCK (pitch); + GST_BUFFER_TIMESTAMP (buffer) = pitch->next_buffer_time; pitch->next_buffer_time += GST_BUFFER_DURATION (buffer); @@ -348,6 +369,8 @@ gst_pitch_forward_buffer (GstPitch * pitch, GstBuffer * buffer) "] (%d samples)", GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)), GST_TIME_ARGS (pitch->next_buffer_time), samples); + GST_OBJECT_UNLOCK (pitch); + return gst_pad_push (pitch->srcpad, buffer); } @@ -355,35 +378,36 @@ gst_pitch_forward_buffer (GstPitch * pitch, GstBuffer * buffer) static GstBuffer * gst_pitch_prepare_buffer (GstPitch * pitch) { - GstPitchPrivate *priv; - guint samples; - GstBuffer *buffer; - GstMapInfo info; - - priv = GST_PITCH_GET_PRIVATE (pitch); - + GstBuffer *buffer = NULL; GST_LOG_OBJECT (pitch, "preparing buffer"); - samples = priv->st->numSamples (); - if (samples == 0) - return NULL; + GST_OBJECT_LOCK (pitch); - buffer = gst_buffer_new_and_alloc (samples * pitch->info.bpf); + GstPitchPrivate *priv = GST_PITCH_GET_PRIVATE (pitch); + guint samples = priv->st->numSamples (); + if (samples > 0) { + buffer = gst_buffer_new_and_alloc (samples * pitch->info.bpf); - gst_buffer_map (buffer, &info, (GstMapFlags) GST_MAP_READWRITE); - samples = - priv->st->receiveSamples ((soundtouch::SAMPLETYPE *) info.data, samples); - gst_buffer_unmap (buffer, &info); + GstMapInfo info; + gst_buffer_map (buffer, &info, (GstMapFlags) GST_MAP_READWRITE); - if (samples <= 0) { - gst_buffer_unref (buffer); - return NULL; + samples = + priv->st->receiveSamples ((soundtouch::SAMPLETYPE *) info.data, + samples); + gst_buffer_unmap (buffer, &info); + + if (samples > 0) { + GST_BUFFER_DURATION (buffer) = + gst_util_uint64_scale (samples, GST_SECOND, pitch->info.rate); + /* temporary store samples here, to avoid having to recalculate this */ + GST_BUFFER_OFFSET (buffer) = (gint64) samples; + } else { + gst_buffer_unref (buffer); + buffer = NULL; + } } - GST_BUFFER_DURATION (buffer) = - gst_util_uint64_scale (samples, GST_SECOND, pitch->info.rate); - /* temporary store samples here, to avoid having to recalculate this */ - GST_BUFFER_OFFSET (buffer) = (gint64) samples; + GST_OBJECT_UNLOCK (pitch); return buffer; } @@ -397,10 +421,12 @@ gst_pitch_flush_buffer (GstPitch * pitch, gboolean send) GstBuffer *buffer; GstPitchPrivate *priv = GST_PITCH_GET_PRIVATE (pitch); + GST_OBJECT_LOCK (pitch); if (priv->st->numUnprocessedSamples () != 0) { - GST_DEBUG_OBJECT (pitch, "flushing buffer"); + GST_DEBUG_OBJECT (pitch, "flushing SoundTouch buffer"); priv->st->flush (); } + GST_OBJECT_UNLOCK (pitch); if (!send) return GST_FLOW_OK; @@ -560,6 +586,7 @@ gst_pitch_src_query (GstPad * pad, GstObject * parent, GstQuery * query) gfloat stream_time_ratio; gint64 next_buffer_offset; GstClockTime next_buffer_time; + GstClockTimeDiff min_latency, max_latency; pitch = GST_PITCH (parent); GstPitchPrivate *priv = GST_PITCH_GET_PRIVATE (pitch); @@ -570,6 +597,8 @@ gst_pitch_src_query (GstPad * pad, GstObject * parent, GstQuery * query) stream_time_ratio = priv->stream_time_ratio; next_buffer_time = pitch->next_buffer_time; next_buffer_offset = pitch->next_buffer_offset; + min_latency = pitch->min_latency; + max_latency = pitch->max_latency; GST_OBJECT_UNLOCK (pitch); switch (GST_QUERY_TYPE (query)) { @@ -654,12 +683,11 @@ gst_pitch_src_query (GstPad * pad, GstObject * parent, GstQuery * query) GST_DEBUG ("Our latency: min %" GST_TIME_FORMAT ", max %" GST_TIME_FORMAT, - GST_TIME_ARGS (pitch->min_latency), - GST_TIME_ARGS (pitch->max_latency)); + GST_TIME_ARGS (min_latency), GST_TIME_ARGS (max_latency)); - min += pitch->min_latency; + min += min_latency; if (max != GST_CLOCK_TIME_NONE) - max += pitch->max_latency; + max += max_latency; GST_DEBUG ("Calculated total latency : min %" GST_TIME_FORMAT " max %" GST_TIME_FORMAT, @@ -693,6 +721,7 @@ gst_pitch_process_segment (GstPitch * pitch, GstEvent ** event) GstSegment seg; g_return_val_if_fail (event, FALSE); + g_return_val_if_fail (*event, FALSE); GstPitchPrivate *priv = GST_PITCH_GET_PRIVATE (pitch); @@ -765,26 +794,37 @@ gst_pitch_sink_event (GstPad * pad, GstObject * parent, GstEvent * event) switch (GST_EVENT_TYPE (event)) { case GST_EVENT_FLUSH_STOP: gst_pitch_flush_buffer (pitch, FALSE); + GST_OBJECT_LOCK (pitch); priv->st->clear (); pitch->next_buffer_offset = 0; pitch->next_buffer_time = GST_CLOCK_TIME_NONE; pitch->min_latency = pitch->max_latency = 0; + GST_OBJECT_UNLOCK (pitch); break; case GST_EVENT_EOS: gst_pitch_flush_buffer (pitch, TRUE); + GST_OBJECT_LOCK (pitch); priv->st->clear (); pitch->min_latency = pitch->max_latency = 0; + GST_OBJECT_UNLOCK (pitch); break; case GST_EVENT_SEGMENT: if (!gst_pitch_process_segment (pitch, &event)) { GST_LOG_OBJECT (pad, "not enough data known, stalling segment"); - if (GST_PITCH_GET_PRIVATE (pitch)->pending_segment) - gst_event_unref (GST_PITCH_GET_PRIVATE (pitch)->pending_segment); - GST_PITCH_GET_PRIVATE (pitch)->pending_segment = event; + + GST_OBJECT_LOCK (pitch); + if (priv->pending_segment) + gst_event_unref (priv->pending_segment); + priv->pending_segment = event; + GST_OBJECT_UNLOCK (pitch); + event = NULL; } + + GST_OBJECT_LOCK (pitch); priv->st->clear (); pitch->min_latency = pitch->max_latency = 0; + GST_OBJECT_UNLOCK (pitch); break; case GST_EVENT_CAPS: { @@ -814,7 +854,9 @@ gst_pitch_update_latency (GstPitch * pitch, GstClockTime timestamp) { GstClockTimeDiff current_latency, min_latency, max_latency; GstPitchPrivate *priv = GST_PITCH_GET_PRIVATE (pitch); + gboolean post_new_latency_message = FALSE; + GST_OBJECT_LOCK (pitch); current_latency = (GstClockTimeDiff) (timestamp / priv->stream_time_ratio) - pitch->next_buffer_time; @@ -825,12 +867,15 @@ gst_pitch_update_latency (GstPitch * pitch, GstClockTime timestamp) if (pitch->min_latency != min_latency || pitch->max_latency != max_latency) { pitch->min_latency = min_latency; pitch->max_latency = max_latency; + post_new_latency_message = TRUE; + } + GST_OBJECT_UNLOCK (pitch); + if (post_new_latency_message) { /* FIXME: what about the LATENCY event? It only has * one latency value, should it be current, min or max? * Should it include upstream latencies? */ - gst_element_post_message (GST_ELEMENT (pitch), gst_message_new_latency (GST_OBJECT (pitch))); } @@ -839,74 +884,72 @@ gst_pitch_update_latency (GstPitch * pitch, GstClockTime timestamp) static GstFlowReturn gst_pitch_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer) { - GstPitch *pitch; - GstPitchPrivate *priv; - GstClockTime timestamp; - GstMapInfo info; + GstPitch *pitch = GST_PITCH (parent); + GstPitchPrivate *priv = GST_PITCH_GET_PRIVATE (pitch); - pitch = GST_PITCH (parent); - priv = GST_PITCH_GET_PRIVATE (pitch); + GstClockTime timestamp = GST_BUFFER_TIMESTAMP (buffer); - timestamp = GST_BUFFER_TIMESTAMP (buffer); + GST_OBJECT_LOCK (pitch); + GstClockTime next_buffer_time = pitch->next_buffer_time; + gfloat stream_time_ratio = priv->stream_time_ratio; + gint bytes_per_frame = pitch->info.bpf; + GstEvent *pending_segment = priv->pending_segment; + priv->pending_segment = NULL; + GST_OBJECT_UNLOCK (pitch); // Remember the first time and corresponding offset - if (!GST_CLOCK_TIME_IS_VALID (pitch->next_buffer_time)) { - gfloat stream_time_ratio; + if (!GST_CLOCK_TIME_IS_VALID (next_buffer_time)) { GstFormat out_format = GST_FORMAT_DEFAULT; + gint64 next_buffer_offset; + + next_buffer_time = timestamp / stream_time_ratio; + gst_pitch_convert (pitch, GST_FORMAT_TIME, timestamp, &out_format, + &next_buffer_offset); GST_OBJECT_LOCK (pitch); - stream_time_ratio = priv->stream_time_ratio; + pitch->next_buffer_time = next_buffer_time; + pitch->next_buffer_offset = next_buffer_offset; GST_OBJECT_UNLOCK (pitch); - - pitch->next_buffer_time = timestamp / stream_time_ratio; - gst_pitch_convert (pitch, GST_FORMAT_TIME, timestamp, &out_format, - &pitch->next_buffer_offset); } - gst_object_sync_values (GST_OBJECT (pitch), pitch->next_buffer_time); + gst_object_sync_values (GST_OBJECT (pitch), next_buffer_time); /* push the received samples on the soundtouch buffer */ GST_LOG_OBJECT (pitch, "incoming buffer (%d samples) %" GST_TIME_FORMAT, - (gint) (gst_buffer_get_size (buffer) / pitch->info.bpf), + (gint) (gst_buffer_get_size (buffer) / bytes_per_frame), GST_TIME_ARGS (timestamp)); - if (GST_PITCH_GET_PRIVATE (pitch)->pending_segment) { - GstEvent *event = - gst_event_copy (GST_PITCH_GET_PRIVATE (pitch)->pending_segment); - + if (pending_segment) { GST_LOG_OBJECT (pitch, "processing stalled segment"); - if (!gst_pitch_process_segment (pitch, &event)) { + + if (!gst_pitch_process_segment (pitch, &pending_segment)) { gst_buffer_unref (buffer); - gst_event_unref (event); + gst_event_unref (pending_segment); return GST_FLOW_ERROR; } - if (!gst_pad_event_default (pitch->sinkpad, parent, event)) { + if (!gst_pad_event_default (pitch->sinkpad, parent, pending_segment)) { gst_buffer_unref (buffer); - gst_event_unref (event); return GST_FLOW_ERROR; } - - gst_event_unref (GST_PITCH_GET_PRIVATE (pitch)->pending_segment); - GST_PITCH_GET_PRIVATE (pitch)->pending_segment = NULL; } + GstMapInfo info; gst_buffer_map (buffer, &info, GST_MAP_READ); GST_OBJECT_LOCK (pitch); priv->st->putSamples ((soundtouch::SAMPLETYPE *) info.data, - info.size / pitch->info.bpf); + info.size / bytes_per_frame); + gboolean has_output_samples_available = !priv->st->isEmpty (); GST_OBJECT_UNLOCK (pitch); gst_buffer_unmap (buffer, &info); gst_buffer_unref (buffer); /* Calculate latency */ - gst_pitch_update_latency (pitch, timestamp); - /* and try to extract some samples from the soundtouch buffer */ - if (!priv->st->isEmpty ()) { - GstBuffer *out_buffer; - out_buffer = gst_pitch_prepare_buffer (pitch); + /* and try to extract some samples from the soundtouch buffer */ + if (has_output_samples_available) { + GstBuffer *out_buffer = gst_pitch_prepare_buffer (pitch); if (out_buffer) return gst_pitch_forward_buffer (pitch, out_buffer); } @@ -922,15 +965,13 @@ gst_pitch_change_state (GstElement * element, GstStateChange transition) GstPitchPrivate *priv = GST_PITCH_GET_PRIVATE (pitch); switch (transition) { - case GST_STATE_CHANGE_NULL_TO_READY: - break; case GST_STATE_CHANGE_READY_TO_PAUSED: + GST_OBJECT_LOCK (pitch); pitch->next_buffer_time = GST_CLOCK_TIME_NONE; pitch->next_buffer_offset = 0; priv->st->clear (); pitch->min_latency = pitch->max_latency = 0; - break; - case GST_STATE_CHANGE_PAUSED_TO_PLAYING: + GST_OBJECT_UNLOCK (pitch); break; default: break; @@ -941,15 +982,14 @@ gst_pitch_change_state (GstElement * element, GstStateChange transition) return ret; switch (transition) { - case GST_STATE_CHANGE_PLAYING_TO_PAUSED: - break; case GST_STATE_CHANGE_PAUSED_TO_READY: - if (GST_PITCH_GET_PRIVATE (pitch)->pending_segment) { - gst_event_unref (GST_PITCH_GET_PRIVATE (pitch)->pending_segment); - GST_PITCH_GET_PRIVATE (pitch)->pending_segment = NULL; + GST_OBJECT_LOCK (pitch); + if (priv->pending_segment) { + gst_event_unref (priv->pending_segment); + priv->pending_segment = NULL; } + GST_OBJECT_UNLOCK (pitch); break; - case GST_STATE_CHANGE_READY_TO_NULL: default: break; }