webrtcdsp: Rewrite echo data synchronization

The previous code would run out of sync if there was packet lost
or clock skews. When that happened, the echo cancellation feature would
completely stop working. As this is crucial for audio calls, this patch
re-implement synchronization completely.

Instead of letting it drift until next discont, we now synchronize
against the record data at every iteration. This way we simply never
let the stream drift for longer then 10ms period. We also shorter the
delay by using the latency up the probe (basically excluding the sink
latency. This is a decent delay to avoid starving in the probe queue.

https://bugzilla.gnome.org/show_bug.cgi?id=768009
This commit is contained in:
Nicolas Dufresne 2016-06-29 15:56:47 -04:00
parent e35e23b734
commit 71c9cdeff4
3 changed files with 144 additions and 124 deletions

View file

@ -173,10 +173,8 @@ struct _GstWebrtcDsp
guint period_size;
/* Protected by the stream lock */
GstClockTime timestamp;
GstAdapter *adapter;
webrtc::AudioProcessing * apm;
gint delay_ms;
/* Protected by the object lock */
gchar *probe_name;
@ -247,138 +245,97 @@ webrtc_error_to_string (gint err)
return str;
}
/* with probe object lock */
static gboolean
gst_webrtc_dsp_sync_reverse_stream (GstWebrtcDsp * self,
GstWebrtcEchoProbe * probe)
static GstBuffer *
gst_webrtc_dsp_take_buffer (GstWebrtcDsp * self)
{
GstClockTime probe_timestamp;
GstClockTimeDiff diff;
GstBuffer *buffer;
GstClockTime timestamp;
guint64 distance;
/* We need to wait for a time reference */
if (!GST_CLOCK_TIME_IS_VALID (self->timestamp))
return FALSE;
timestamp = gst_adapter_prev_pts (self->adapter, &distance);
timestamp += gst_util_uint64_scale_int (distance / self->info.bpf,
GST_SECOND, self->info.rate);
probe_timestamp = gst_adapter_prev_pts (probe->adapter, &distance);
buffer = gst_adapter_take_buffer (self->adapter, self->period_size);
if (!GST_CLOCK_TIME_IS_VALID (probe_timestamp)) {
GST_WARNING_OBJECT (self,
"Echo Probe is handling buffer without timestamp.");
return FALSE;
}
GST_BUFFER_PTS (buffer) = timestamp;
GST_BUFFER_DURATION (buffer) = 10 * GST_MSECOND;
if (gst_adapter_pts_at_discont (probe->adapter) == probe_timestamp) {
if (distance == 0)
probe->synchronized = FALSE;
}
if (gst_adapter_pts_at_discont (self->adapter) == timestamp && distance == 0) {
GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
} else
GST_BUFFER_FLAG_UNSET (buffer, GST_BUFFER_FLAG_DISCONT);
if (probe->synchronized)
return TRUE;
if (gst_adapter_available (probe->adapter) < probe->period_size
|| probe->latency == -1) {
GST_TRACE_OBJECT (self, "Echo Probe not ready yet");
return FALSE;
}
if (self->info.rate != probe->info.rate) {
GST_WARNING_OBJECT (self,
"Echo Probe has rate %i while the DSP is running at rate %i, use a "
"caps filter to ensure those are the same.",
probe->info.rate, self->info.rate);
return FALSE;
}
probe_timestamp += gst_util_uint64_scale_int (distance / probe->info.bpf,
GST_SECOND, probe->info.rate);
diff = GST_CLOCK_DIFF (probe_timestamp, self->timestamp);
self->delay_ms = (probe->latency - diff) / GST_MSECOND;
GST_DEBUG_OBJECT (probe, "Echo Probe is now synchronized");
probe->synchronized = TRUE;
return TRUE;
return buffer;
}
static void
gst_webrtc_dsp_analyze_reverse_stream (GstWebrtcDsp * self)
static GstFlowReturn
gst_webrtc_dsp_analyze_reverse_stream (GstWebrtcDsp * self,
GstClockTime rec_time)
{
GstWebrtcEchoProbe *probe = NULL;
webrtc::AudioProcessing * apm;
webrtc::AudioFrame frame;
gint err;
GstFlowReturn ret = GST_FLOW_OK;
gint err, delay;
GST_OBJECT_LOCK (self);
if (self->echo_cancel)
probe = GST_WEBRTC_ECHO_PROBE (g_object_ref (self->probe));
GST_OBJECT_UNLOCK (self);
/* If echo cancellation is disabled */
if (!probe)
return;
return GST_FLOW_OK;
apm = self->apm;
GST_WEBRTC_ECHO_PROBE_LOCK (probe);
delay = gst_webrtc_echo_probe_read (probe, rec_time, (gpointer) &frame);
if (gst_adapter_available (probe->adapter) < probe->period_size) {
GST_LOG_OBJECT (self, "No echo data yet...");
goto beach;
apm->set_stream_delay_ms (delay);
if (delay < 0)
goto done;
if (frame.sample_rate_hz_ != self->info.rate) {
GST_ELEMENT_ERROR (self, STREAM, FORMAT,
("Echo Probe has rate %i , while the DSP is running at rate %i,"
" use a caps filter to ensure those are the same.",
frame.sample_rate_hz_, self->info.rate), (NULL));
ret = GST_FLOW_ERROR;
goto done;
}
if (!gst_webrtc_dsp_sync_reverse_stream (self, probe))
goto beach;
frame.num_channels_ = probe->info.channels;
frame.sample_rate_hz_ = probe->info.rate;
frame.samples_per_channel_ = probe->period_size / probe->info.bpf;
gst_adapter_copy (probe->adapter, (guint8 *) frame.data_, 0,
probe->period_size);
gst_adapter_flush (probe->adapter, self->period_size);
if ((err = apm->AnalyzeReverseStream (&frame)) < 0)
GST_WARNING_OBJECT (self, "Reverse stream analyses failed: %s.",
webrtc_error_to_string (err));
beach:
GST_WEBRTC_ECHO_PROBE_UNLOCK (probe);
done:
gst_object_unref (probe);
return ret;
}
static GstBuffer *
gst_webrtc_dsp_process_stream (GstWebrtcDsp * self)
static GstFlowReturn
gst_webrtc_dsp_process_stream (GstWebrtcDsp * self,
GstBuffer * buffer)
{
GstBuffer *buffer;
GstMapInfo info;
webrtc::AudioProcessing * apm = self->apm;
webrtc::AudioFrame frame;
GstClockTime timestamp;
guint64 distance;
gint err;
frame.num_channels_ = self->info.channels;
frame.sample_rate_hz_ = self->info.rate;
frame.samples_per_channel_ = self->period_size / self->info.bpf;
timestamp = gst_adapter_prev_pts (self->adapter, &distance);
if (GST_CLOCK_TIME_IS_VALID (timestamp))
timestamp += gst_util_uint64_scale_int (distance / self->info.bpf,
GST_SECOND, self->info.rate);
buffer = gst_adapter_take_buffer (self->adapter, self->period_size);
if (!gst_buffer_map (buffer, &info, (GstMapFlags) GST_MAP_READWRITE)) {
gst_buffer_unref (buffer);
return NULL;
return GST_FLOW_ERROR;
}
memcpy (frame.data_, info.data, self->period_size);
apm->set_stream_delay_ms (self->delay_ms);
if ((err = apm->ProcessStream (&frame)) < 0) {
GST_WARNING_OBJECT (self, "Failed to filter the audio: %s.",
webrtc_error_to_string (err));
@ -388,18 +345,7 @@ gst_webrtc_dsp_process_stream (GstWebrtcDsp * self)
gst_buffer_unmap (buffer, &info);
GST_BUFFER_PTS (buffer) = timestamp;
GST_BUFFER_DURATION (buffer) = 10 * GST_MSECOND;
if (gst_adapter_pts_at_discont (self->adapter) == timestamp)
GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
else
GST_BUFFER_FLAG_UNSET (buffer, GST_BUFFER_FLAG_DISCONT);
if (GST_CLOCK_TIME_IS_VALID (timestamp))
self->timestamp = timestamp + GST_BUFFER_DURATION (buffer);
return buffer;
return GST_FLOW_OK;
}
static GstFlowReturn
@ -412,18 +358,9 @@ gst_webrtc_dsp_submit_input_buffer (GstBaseTransform * btrans,
GST_BUFFER_PTS (buffer) = gst_segment_to_running_time (&btrans->segment,
GST_FORMAT_TIME, GST_BUFFER_PTS (buffer));
if (!GST_CLOCK_TIME_IS_VALID (self->timestamp))
self->timestamp = GST_BUFFER_PTS (buffer);
if (is_discont) {
GST_OBJECT_LOCK (self);
if (self->echo_cancel && self->probe) {
GST_WEBRTC_ECHO_PROBE_LOCK (self->probe);
self->probe->synchronized = FALSE;
GST_WEBRTC_ECHO_PROBE_UNLOCK (self->probe);
}
GST_OBJECT_UNLOCK (self);
GST_DEBUG_OBJECT (self,
"Received discont, clearing adapter.");
gst_adapter_clear (self->adapter);
}
@ -436,15 +373,20 @@ static GstFlowReturn
gst_webrtc_dsp_generate_output (GstBaseTransform * btrans, GstBuffer ** outbuf)
{
GstWebrtcDsp *self = GST_WEBRTC_DSP (btrans);
GstFlowReturn ret;
if (gst_adapter_available (self->adapter) >= self->period_size) {
gst_webrtc_dsp_analyze_reverse_stream (self);
*outbuf = gst_webrtc_dsp_process_stream (self);
} else {
if (gst_adapter_available (self->adapter) < self->period_size) {
*outbuf = NULL;
return GST_FLOW_OK;
}
return GST_FLOW_OK;
*outbuf = gst_webrtc_dsp_take_buffer (self);
ret = gst_webrtc_dsp_analyze_reverse_stream (self, GST_BUFFER_PTS (*outbuf));
if (ret == GST_FLOW_OK)
ret = gst_webrtc_dsp_process_stream (self, *outbuf);
return ret;
}
static gboolean
@ -495,8 +437,6 @@ gst_webrtc_dsp_setup (GstAudioFilter * filter, const GstAudioInfo * info)
GST_OBJECT_LOCK (self);
gst_adapter_clear (self->adapter);
self->timestamp = GST_CLOCK_TIME_NONE;
self->delay_ms = 0;
self->info = *info;
apm = self->apm;
@ -515,8 +455,6 @@ gst_webrtc_dsp_setup (GstAudioFilter * filter, const GstAudioInfo * info)
probe_info = self->probe->info;
}
self->probe->synchronized = FALSE;
GST_WEBRTC_ECHO_PROBE_UNLOCK (self->probe);
}
@ -540,7 +478,7 @@ gst_webrtc_dsp_setup (GstAudioFilter * filter, const GstAudioInfo * info)
apm->high_pass_filter ()->Enable (true);
}
if (self->echo_cancel && self->probe) {
if (self->echo_cancel) {
GST_DEBUG_OBJECT (self, "Enabling Echo Cancellation");
apm->echo_cancellation ()->enable_drift_compensation (false);
apm->echo_cancellation ()

View file

@ -80,7 +80,6 @@ gst_webrtc_echo_probe_setup (GstAudioFilter * filter, const GstAudioInfo * info)
GST_WEBRTC_ECHO_PROBE_LOCK (self);
self->info = *info;
self->synchronized = FALSE;
/* WebRTC library works with 10ms buffers, compute once this size */
self->period_size = info->bpf * info->rate / 100;
@ -119,19 +118,31 @@ gst_webrtc_echo_probe_src_event (GstBaseTransform * btrans, GstEvent * event)
GstBaseTransformClass *klass;
GstWebrtcEchoProbe *self = GST_WEBRTC_ECHO_PROBE (btrans);
GstClockTime latency;
GstClockTime upstream_latency = 0;
GstQuery *query;
klass = GST_BASE_TRANSFORM_CLASS (gst_webrtc_echo_probe_parent_class);
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_LATENCY:
gst_event_parse_latency (event, &latency);
query = gst_query_new_latency ();
if (gst_pad_query (btrans->srcpad, query)) {
gst_query_parse_latency (query, NULL, &upstream_latency, NULL);
if (!GST_CLOCK_TIME_IS_VALID (upstream_latency))
upstream_latency = 0;
}
GST_WEBRTC_ECHO_PROBE_LOCK (self);
self->latency = latency;
self->delay = upstream_latency / GST_MSECOND;
GST_WEBRTC_ECHO_PROBE_UNLOCK (self);
GST_DEBUG_OBJECT (self, "We have a latency of %" GST_TIME_FORMAT,
GST_TIME_ARGS (latency));
GST_DEBUG_OBJECT (self, "We have a latency of %" GST_TIME_FORMAT
" and delay of %ims", GST_TIME_ARGS (latency),
(gint) (upstream_latency / GST_MSECOND));
break;
default:
break;
@ -184,7 +195,7 @@ gst_webrtc_echo_probe_init (GstWebrtcEchoProbe * self)
gst_audio_info_init (&self->info);
g_mutex_init (&self->lock);
self->latency = -1;
self->latency = GST_CLOCK_TIME_NONE;
G_LOCK (gst_aec_probes);
gst_aec_probes = g_list_prepend (gst_aec_probes, self);
@ -254,3 +265,72 @@ gst_webrtc_release_echo_probe (GstWebrtcEchoProbe * probe)
GST_WEBRTC_ECHO_PROBE_UNLOCK (probe);
gst_object_unref (probe);
}
gint
gst_webrtc_echo_probe_read (GstWebrtcEchoProbe * self, GstClockTime rec_time,
gpointer _frame)
{
webrtc::AudioFrame * frame = (webrtc::AudioFrame *) _frame;
GstClockTimeDiff diff;
gsize avail, skip, offset, size;
gint delay = -1;
GST_WEBRTC_ECHO_PROBE_LOCK (self);
if (!GST_CLOCK_TIME_IS_VALID (self->latency))
goto done;
if (gst_adapter_available (self->adapter) == 0) {
diff = G_MAXINT64;
} else {
GstClockTime play_time;
guint64 distance;
play_time = gst_adapter_prev_pts (self->adapter, &distance);
if (GST_CLOCK_TIME_IS_VALID (play_time)) {
play_time += gst_util_uint64_scale_int (distance / self->info.bpf,
GST_SECOND, self->info.rate);
play_time += self->latency;
diff = GST_CLOCK_DIFF (rec_time, play_time) / GST_MSECOND;
} else {
/* We have no timestamp, assume perfect delay */
diff = self->delay;
}
}
avail = gst_adapter_available (self->adapter);
if (diff > self->delay) {
skip = (diff - self->delay) * self->info.rate / 1000 * self->info.bpf;
skip = MIN (self->period_size, skip);
offset = 0;
} else {
skip = 0;
offset = (self->delay - diff) * self->info.rate / 1000 * self->info.bpf;
offset = MIN (avail, offset);
}
size = MIN (avail - offset, self->period_size - skip);
if (size < self->period_size)
memset (frame->data_, 0, self->period_size);
if (size) {
gst_adapter_copy (self->adapter, (guint8 *) frame->data_ + skip,
offset, size);
gst_adapter_flush (self->adapter, offset + size);
}
frame->num_channels_ = self->info.channels;
frame->sample_rate_hz_ = self->info.rate;
frame->samples_per_channel_ = self->period_size / self->info.bpf;
delay = self->delay;
done:
GST_WEBRTC_ECHO_PROBE_UNLOCK (self);
return delay;
}

View file

@ -62,8 +62,8 @@ struct _GstWebrtcEchoProbe
/* Protected by the lock */
GstAudioInfo info;
guint period_size;
gint latency;
gboolean synchronized;
GstClockTime latency;
gint delay;
GstSegment segment;
GstAdapter *adapter;
@ -81,6 +81,8 @@ GType gst_webrtc_echo_probe_get_type (void);
GstWebrtcEchoProbe *gst_webrtc_acquire_echo_probe (const gchar * name);
void gst_webrtc_release_echo_probe (GstWebrtcEchoProbe * probe);
gint gst_webrtc_echo_probe_read (GstWebrtcEchoProbe * self,
GstClockTime rec_time, gpointer frame);
G_END_DECLS
#endif /* __GST_WEBRTC_ECHO_PROBE_H__ */