pitch: fix multithread accesses

- fully protect accesses to the libsoundtouch API that is not
  thread-safe.
- fully protect accesses to GstPitch members that could be read by a
  downstream query thread while written by an upstream streaming thread
  or a user thread.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/6247>
This commit is contained in:
Loïc Le Page 2024-02-28 18:47:58 +01:00 committed by GStreamer Marge Bot
parent 9f5bb30d3a
commit 35165da586

View file

@ -98,6 +98,7 @@ GST_STATIC_PAD_TEMPLATE ("src",
GST_STATIC_CAPS (SUPPORTED_CAPS));
static void gst_pitch_dispose (GObject * object);
static void gst_pitch_finalize (GObject * object);
static void gst_pitch_set_property (GObject * object,
guint prop_id, const GValue * value, GParamSpec * pspec);
static void gst_pitch_get_property (GObject * object,
@ -136,6 +137,7 @@ gst_pitch_class_init (GstPitchClass * klass)
gobject_class->set_property = gst_pitch_set_property;
gobject_class->get_property = gst_pitch_get_property;
gobject_class->dispose = GST_DEBUG_FUNCPTR (gst_pitch_dispose);
gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_pitch_finalize);
g_object_class_install_property (gobject_class, ARG_PITCH,
g_param_spec_float ("pitch", "Pitch",
@ -223,12 +225,23 @@ gst_pitch_dispose (GObject * object)
GstPitch *pitch = GST_PITCH (object);
GstPitchPrivate *priv = GST_PITCH_GET_PRIVATE (pitch);
gst_clear_event (&priv->pending_segment);
G_OBJECT_CLASS (parent_class)->dispose (object);
}
static void
gst_pitch_finalize (GObject * object)
{
GstPitch *pitch = GST_PITCH (object);
GstPitchPrivate *priv = GST_PITCH_GET_PRIVATE (pitch);
if (priv->st) {
delete priv->st;
priv->st = NULL;
}
G_OBJECT_CLASS (parent_class)->dispose (object);
G_OBJECT_CLASS (parent_class)->finalize (object);
}
static void
@ -247,9 +260,9 @@ gst_pitch_set_property (GObject * object, guint prop_id,
GstPitch *pitch = GST_PITCH (object);
GstPitchPrivate *priv = GST_PITCH_GET_PRIVATE (pitch);
GST_OBJECT_LOCK (pitch);
switch (prop_id) {
case ARG_TEMPO:
GST_OBJECT_LOCK (pitch);
pitch->tempo = g_value_get_float (value);
priv->stream_time_ratio =
pitch->tempo * pitch->rate * pitch->segment_applied_rate;
@ -258,6 +271,7 @@ gst_pitch_set_property (GObject * object, guint prop_id,
gst_pitch_update_duration (pitch);
break;
case ARG_RATE:
GST_OBJECT_LOCK (pitch);
pitch->rate = g_value_get_float (value);
priv->stream_time_ratio =
pitch->tempo * pitch->rate * pitch->segment_applied_rate;
@ -267,17 +281,18 @@ gst_pitch_set_property (GObject * object, guint prop_id,
break;
case ARG_OUTPUT_RATE:
/* Has no effect until the next input segment */
GST_OBJECT_LOCK (pitch);
pitch->output_rate = g_value_get_float (value);
GST_OBJECT_UNLOCK (pitch);
break;
case ARG_PITCH:
GST_OBJECT_LOCK (pitch);
pitch->pitch = g_value_get_float (value);
priv->st->setPitch (pitch->pitch);
GST_OBJECT_UNLOCK (pitch);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
GST_OBJECT_UNLOCK (pitch);
break;
}
}
@ -288,43 +303,47 @@ gst_pitch_get_property (GObject * object, guint prop_id,
{
GstPitch *pitch = GST_PITCH (object);
GST_OBJECT_LOCK (pitch);
switch (prop_id) {
case ARG_TEMPO:
GST_OBJECT_LOCK (pitch);
g_value_set_float (value, pitch->tempo);
GST_OBJECT_UNLOCK (pitch);
break;
case ARG_RATE:
GST_OBJECT_LOCK (pitch);
g_value_set_float (value, pitch->rate);
GST_OBJECT_UNLOCK (pitch);
break;
case ARG_OUTPUT_RATE:
GST_OBJECT_LOCK (pitch);
g_value_set_float (value, pitch->output_rate);
GST_OBJECT_UNLOCK (pitch);
break;
case ARG_PITCH:
GST_OBJECT_LOCK (pitch);
g_value_set_float (value, pitch->pitch);
GST_OBJECT_UNLOCK (pitch);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
GST_OBJECT_UNLOCK (pitch);
}
static gboolean
gst_pitch_setcaps (GstPitch * pitch, GstCaps * caps)
{
GstPitchPrivate *priv;
priv = GST_PITCH_GET_PRIVATE (pitch);
if (!gst_audio_info_from_caps (&pitch->info, caps))
GstAudioInfo info;
if (!gst_audio_info_from_caps (&info, caps))
return FALSE;
GST_OBJECT_LOCK (pitch);
pitch->info = info;
/* notify the soundtouch instance of this change */
GstPitchPrivate *priv = GST_PITCH_GET_PRIVATE (pitch);
priv->st->setSampleRate (pitch->info.rate);
priv->st->setChannels (pitch->info.channels);
GST_OBJECT_UNLOCK (pitch);
return TRUE;
@ -336,6 +355,8 @@ gst_pitch_forward_buffer (GstPitch * pitch, GstBuffer * buffer)
{
gint samples;
GST_OBJECT_LOCK (pitch);
GST_BUFFER_TIMESTAMP (buffer) = pitch->next_buffer_time;
pitch->next_buffer_time += GST_BUFFER_DURATION (buffer);
@ -348,6 +369,8 @@ gst_pitch_forward_buffer (GstPitch * pitch, GstBuffer * buffer)
"] (%d samples)", GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
GST_TIME_ARGS (pitch->next_buffer_time), samples);
GST_OBJECT_UNLOCK (pitch);
return gst_pad_push (pitch->srcpad, buffer);
}
@ -355,35 +378,36 @@ gst_pitch_forward_buffer (GstPitch * pitch, GstBuffer * buffer)
static GstBuffer *
gst_pitch_prepare_buffer (GstPitch * pitch)
{
GstPitchPrivate *priv;
guint samples;
GstBuffer *buffer;
GstMapInfo info;
priv = GST_PITCH_GET_PRIVATE (pitch);
GstBuffer *buffer = NULL;
GST_LOG_OBJECT (pitch, "preparing buffer");
samples = priv->st->numSamples ();
if (samples == 0)
return NULL;
GST_OBJECT_LOCK (pitch);
buffer = gst_buffer_new_and_alloc (samples * pitch->info.bpf);
GstPitchPrivate *priv = GST_PITCH_GET_PRIVATE (pitch);
guint samples = priv->st->numSamples ();
if (samples > 0) {
buffer = gst_buffer_new_and_alloc (samples * pitch->info.bpf);
gst_buffer_map (buffer, &info, (GstMapFlags) GST_MAP_READWRITE);
samples =
priv->st->receiveSamples ((soundtouch::SAMPLETYPE *) info.data, samples);
gst_buffer_unmap (buffer, &info);
GstMapInfo info;
gst_buffer_map (buffer, &info, (GstMapFlags) GST_MAP_READWRITE);
if (samples <= 0) {
gst_buffer_unref (buffer);
return NULL;
samples =
priv->st->receiveSamples ((soundtouch::SAMPLETYPE *) info.data,
samples);
gst_buffer_unmap (buffer, &info);
if (samples > 0) {
GST_BUFFER_DURATION (buffer) =
gst_util_uint64_scale (samples, GST_SECOND, pitch->info.rate);
/* temporary store samples here, to avoid having to recalculate this */
GST_BUFFER_OFFSET (buffer) = (gint64) samples;
} else {
gst_buffer_unref (buffer);
buffer = NULL;
}
}
GST_BUFFER_DURATION (buffer) =
gst_util_uint64_scale (samples, GST_SECOND, pitch->info.rate);
/* temporary store samples here, to avoid having to recalculate this */
GST_BUFFER_OFFSET (buffer) = (gint64) samples;
GST_OBJECT_UNLOCK (pitch);
return buffer;
}
@ -397,10 +421,12 @@ gst_pitch_flush_buffer (GstPitch * pitch, gboolean send)
GstBuffer *buffer;
GstPitchPrivate *priv = GST_PITCH_GET_PRIVATE (pitch);
GST_OBJECT_LOCK (pitch);
if (priv->st->numUnprocessedSamples () != 0) {
GST_DEBUG_OBJECT (pitch, "flushing buffer");
GST_DEBUG_OBJECT (pitch, "flushing SoundTouch buffer");
priv->st->flush ();
}
GST_OBJECT_UNLOCK (pitch);
if (!send)
return GST_FLOW_OK;
@ -560,6 +586,7 @@ gst_pitch_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
gfloat stream_time_ratio;
gint64 next_buffer_offset;
GstClockTime next_buffer_time;
GstClockTimeDiff min_latency, max_latency;
pitch = GST_PITCH (parent);
GstPitchPrivate *priv = GST_PITCH_GET_PRIVATE (pitch);
@ -570,6 +597,8 @@ gst_pitch_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
stream_time_ratio = priv->stream_time_ratio;
next_buffer_time = pitch->next_buffer_time;
next_buffer_offset = pitch->next_buffer_offset;
min_latency = pitch->min_latency;
max_latency = pitch->max_latency;
GST_OBJECT_UNLOCK (pitch);
switch (GST_QUERY_TYPE (query)) {
@ -654,12 +683,11 @@ gst_pitch_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
GST_DEBUG ("Our latency: min %" GST_TIME_FORMAT
", max %" GST_TIME_FORMAT,
GST_TIME_ARGS (pitch->min_latency),
GST_TIME_ARGS (pitch->max_latency));
GST_TIME_ARGS (min_latency), GST_TIME_ARGS (max_latency));
min += pitch->min_latency;
min += min_latency;
if (max != GST_CLOCK_TIME_NONE)
max += pitch->max_latency;
max += max_latency;
GST_DEBUG ("Calculated total latency : min %"
GST_TIME_FORMAT " max %" GST_TIME_FORMAT,
@ -693,6 +721,7 @@ gst_pitch_process_segment (GstPitch * pitch, GstEvent ** event)
GstSegment seg;
g_return_val_if_fail (event, FALSE);
g_return_val_if_fail (*event, FALSE);
GstPitchPrivate *priv = GST_PITCH_GET_PRIVATE (pitch);
@ -765,26 +794,37 @@ gst_pitch_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_FLUSH_STOP:
gst_pitch_flush_buffer (pitch, FALSE);
GST_OBJECT_LOCK (pitch);
priv->st->clear ();
pitch->next_buffer_offset = 0;
pitch->next_buffer_time = GST_CLOCK_TIME_NONE;
pitch->min_latency = pitch->max_latency = 0;
GST_OBJECT_UNLOCK (pitch);
break;
case GST_EVENT_EOS:
gst_pitch_flush_buffer (pitch, TRUE);
GST_OBJECT_LOCK (pitch);
priv->st->clear ();
pitch->min_latency = pitch->max_latency = 0;
GST_OBJECT_UNLOCK (pitch);
break;
case GST_EVENT_SEGMENT:
if (!gst_pitch_process_segment (pitch, &event)) {
GST_LOG_OBJECT (pad, "not enough data known, stalling segment");
if (GST_PITCH_GET_PRIVATE (pitch)->pending_segment)
gst_event_unref (GST_PITCH_GET_PRIVATE (pitch)->pending_segment);
GST_PITCH_GET_PRIVATE (pitch)->pending_segment = event;
GST_OBJECT_LOCK (pitch);
if (priv->pending_segment)
gst_event_unref (priv->pending_segment);
priv->pending_segment = event;
GST_OBJECT_UNLOCK (pitch);
event = NULL;
}
GST_OBJECT_LOCK (pitch);
priv->st->clear ();
pitch->min_latency = pitch->max_latency = 0;
GST_OBJECT_UNLOCK (pitch);
break;
case GST_EVENT_CAPS:
{
@ -814,7 +854,9 @@ gst_pitch_update_latency (GstPitch * pitch, GstClockTime timestamp)
{
GstClockTimeDiff current_latency, min_latency, max_latency;
GstPitchPrivate *priv = GST_PITCH_GET_PRIVATE (pitch);
gboolean post_new_latency_message = FALSE;
GST_OBJECT_LOCK (pitch);
current_latency =
(GstClockTimeDiff) (timestamp / priv->stream_time_ratio) -
pitch->next_buffer_time;
@ -825,12 +867,15 @@ gst_pitch_update_latency (GstPitch * pitch, GstClockTime timestamp)
if (pitch->min_latency != min_latency || pitch->max_latency != max_latency) {
pitch->min_latency = min_latency;
pitch->max_latency = max_latency;
post_new_latency_message = TRUE;
}
GST_OBJECT_UNLOCK (pitch);
if (post_new_latency_message) {
/* FIXME: what about the LATENCY event? It only has
* one latency value, should it be current, min or max?
* Should it include upstream latencies?
*/
gst_element_post_message (GST_ELEMENT (pitch),
gst_message_new_latency (GST_OBJECT (pitch)));
}
@ -839,74 +884,72 @@ gst_pitch_update_latency (GstPitch * pitch, GstClockTime timestamp)
static GstFlowReturn
gst_pitch_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
{
GstPitch *pitch;
GstPitchPrivate *priv;
GstClockTime timestamp;
GstMapInfo info;
GstPitch *pitch = GST_PITCH (parent);
GstPitchPrivate *priv = GST_PITCH_GET_PRIVATE (pitch);
pitch = GST_PITCH (parent);
priv = GST_PITCH_GET_PRIVATE (pitch);
GstClockTime timestamp = GST_BUFFER_TIMESTAMP (buffer);
timestamp = GST_BUFFER_TIMESTAMP (buffer);
GST_OBJECT_LOCK (pitch);
GstClockTime next_buffer_time = pitch->next_buffer_time;
gfloat stream_time_ratio = priv->stream_time_ratio;
gint bytes_per_frame = pitch->info.bpf;
GstEvent *pending_segment = priv->pending_segment;
priv->pending_segment = NULL;
GST_OBJECT_UNLOCK (pitch);
// Remember the first time and corresponding offset
if (!GST_CLOCK_TIME_IS_VALID (pitch->next_buffer_time)) {
gfloat stream_time_ratio;
if (!GST_CLOCK_TIME_IS_VALID (next_buffer_time)) {
GstFormat out_format = GST_FORMAT_DEFAULT;
gint64 next_buffer_offset;
next_buffer_time = timestamp / stream_time_ratio;
gst_pitch_convert (pitch, GST_FORMAT_TIME, timestamp, &out_format,
&next_buffer_offset);
GST_OBJECT_LOCK (pitch);
stream_time_ratio = priv->stream_time_ratio;
pitch->next_buffer_time = next_buffer_time;
pitch->next_buffer_offset = next_buffer_offset;
GST_OBJECT_UNLOCK (pitch);
pitch->next_buffer_time = timestamp / stream_time_ratio;
gst_pitch_convert (pitch, GST_FORMAT_TIME, timestamp, &out_format,
&pitch->next_buffer_offset);
}
gst_object_sync_values (GST_OBJECT (pitch), pitch->next_buffer_time);
gst_object_sync_values (GST_OBJECT (pitch), next_buffer_time);
/* push the received samples on the soundtouch buffer */
GST_LOG_OBJECT (pitch, "incoming buffer (%d samples) %" GST_TIME_FORMAT,
(gint) (gst_buffer_get_size (buffer) / pitch->info.bpf),
(gint) (gst_buffer_get_size (buffer) / bytes_per_frame),
GST_TIME_ARGS (timestamp));
if (GST_PITCH_GET_PRIVATE (pitch)->pending_segment) {
GstEvent *event =
gst_event_copy (GST_PITCH_GET_PRIVATE (pitch)->pending_segment);
if (pending_segment) {
GST_LOG_OBJECT (pitch, "processing stalled segment");
if (!gst_pitch_process_segment (pitch, &event)) {
if (!gst_pitch_process_segment (pitch, &pending_segment)) {
gst_buffer_unref (buffer);
gst_event_unref (event);
gst_event_unref (pending_segment);
return GST_FLOW_ERROR;
}
if (!gst_pad_event_default (pitch->sinkpad, parent, event)) {
if (!gst_pad_event_default (pitch->sinkpad, parent, pending_segment)) {
gst_buffer_unref (buffer);
gst_event_unref (event);
return GST_FLOW_ERROR;
}
gst_event_unref (GST_PITCH_GET_PRIVATE (pitch)->pending_segment);
GST_PITCH_GET_PRIVATE (pitch)->pending_segment = NULL;
}
GstMapInfo info;
gst_buffer_map (buffer, &info, GST_MAP_READ);
GST_OBJECT_LOCK (pitch);
priv->st->putSamples ((soundtouch::SAMPLETYPE *) info.data,
info.size / pitch->info.bpf);
info.size / bytes_per_frame);
gboolean has_output_samples_available = !priv->st->isEmpty ();
GST_OBJECT_UNLOCK (pitch);
gst_buffer_unmap (buffer, &info);
gst_buffer_unref (buffer);
/* Calculate latency */
gst_pitch_update_latency (pitch, timestamp);
/* and try to extract some samples from the soundtouch buffer */
if (!priv->st->isEmpty ()) {
GstBuffer *out_buffer;
out_buffer = gst_pitch_prepare_buffer (pitch);
/* and try to extract some samples from the soundtouch buffer */
if (has_output_samples_available) {
GstBuffer *out_buffer = gst_pitch_prepare_buffer (pitch);
if (out_buffer)
return gst_pitch_forward_buffer (pitch, out_buffer);
}
@ -922,15 +965,13 @@ gst_pitch_change_state (GstElement * element, GstStateChange transition)
GstPitchPrivate *priv = GST_PITCH_GET_PRIVATE (pitch);
switch (transition) {
case GST_STATE_CHANGE_NULL_TO_READY:
break;
case GST_STATE_CHANGE_READY_TO_PAUSED:
GST_OBJECT_LOCK (pitch);
pitch->next_buffer_time = GST_CLOCK_TIME_NONE;
pitch->next_buffer_offset = 0;
priv->st->clear ();
pitch->min_latency = pitch->max_latency = 0;
break;
case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
GST_OBJECT_UNLOCK (pitch);
break;
default:
break;
@ -941,15 +982,14 @@ gst_pitch_change_state (GstElement * element, GstStateChange transition)
return ret;
switch (transition) {
case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
break;
case GST_STATE_CHANGE_PAUSED_TO_READY:
if (GST_PITCH_GET_PRIVATE (pitch)->pending_segment) {
gst_event_unref (GST_PITCH_GET_PRIVATE (pitch)->pending_segment);
GST_PITCH_GET_PRIVATE (pitch)->pending_segment = NULL;
GST_OBJECT_LOCK (pitch);
if (priv->pending_segment) {
gst_event_unref (priv->pending_segment);
priv->pending_segment = NULL;
}
GST_OBJECT_UNLOCK (pitch);
break;
case GST_STATE_CHANGE_READY_TO_NULL:
default:
break;
}