mirror of
https://gitlab.freedesktop.org/gstreamer/gstreamer.git
synced 2024-12-27 10:40:34 +00:00
fab64c0b3a
Plugins know that they will be initialized after Gst was initialized so they can call the initialization function dedicated for the python bindings Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/2675>
242 lines
8.7 KiB
Python
242 lines
8.7 KiB
Python
'''
|
|
Element that transforms audio samples to video frames representing
|
|
the waveform.
|
|
|
|
Requires matplotlib, numpy and numpy_ringbuffer
|
|
|
|
Example pipeline:
|
|
|
|
gst-launch-1.0 audiotestsrc ! audioplot window-duration=0.01 ! videoconvert ! autovideosink
|
|
'''
|
|
|
|
import gi
|
|
|
|
gi.require_version('Gst', '1.0')
|
|
gi.require_version('GstBase', '1.0')
|
|
gi.require_version('GstAudio', '1.0')
|
|
gi.require_version('GstVideo', '1.0')
|
|
|
|
from gi.repository import Gst, GLib, GObject, GstBase, GstAudio, GstVideo
|
|
|
|
try:
|
|
import numpy as np
|
|
import matplotlib.patheffects as pe
|
|
from numpy_ringbuffer import RingBuffer
|
|
from matplotlib import pyplot as plt
|
|
from matplotlib.backends.backend_agg import FigureCanvasAgg
|
|
except ImportError:
|
|
Gst.error('audioplot requires numpy, numpy_ringbuffer and matplotlib')
|
|
raise
|
|
|
|
|
|
Gst.init_python()
|
|
|
|
AUDIO_FORMATS = [f.strip() for f in
|
|
GstAudio.AUDIO_FORMATS_ALL.strip('{ }').split(',')]
|
|
|
|
ICAPS = Gst.Caps(Gst.Structure('audio/x-raw',
|
|
format=Gst.ValueList(AUDIO_FORMATS),
|
|
layout='interleaved',
|
|
rate = Gst.IntRange(range(1, GLib.MAXINT)),
|
|
channels = Gst.IntRange(range(1, GLib.MAXINT))))
|
|
|
|
OCAPS = Gst.Caps(Gst.Structure('video/x-raw',
|
|
format='ARGB',
|
|
width=Gst.IntRange(range(1, GLib.MAXINT)),
|
|
height=Gst.IntRange(range(1, GLib.MAXINT)),
|
|
framerate=Gst.FractionRange(Gst.Fraction(1, 1),
|
|
Gst.Fraction(GLib.MAXINT, 1))))
|
|
|
|
DEFAULT_WINDOW_DURATION = 1.0
|
|
DEFAULT_WIDTH = 640
|
|
DEFAULT_HEIGHT = 480
|
|
DEFAULT_FRAMERATE_NUM = 25
|
|
DEFAULT_FRAMERATE_DENOM = 1
|
|
|
|
|
|
class AudioPlotFilter(GstBase.BaseTransform):
|
|
__gstmetadata__ = ('AudioPlotFilter','Filter', \
|
|
'Plot audio waveforms', 'Mathieu Duponchelle')
|
|
|
|
__gsttemplates__ = (Gst.PadTemplate.new("src",
|
|
Gst.PadDirection.SRC,
|
|
Gst.PadPresence.ALWAYS,
|
|
OCAPS),
|
|
Gst.PadTemplate.new("sink",
|
|
Gst.PadDirection.SINK,
|
|
Gst.PadPresence.ALWAYS,
|
|
ICAPS))
|
|
__gproperties__ = {
|
|
"window-duration": (float,
|
|
"Window Duration",
|
|
"Duration of the sliding window, in seconds",
|
|
0.01,
|
|
100.0,
|
|
DEFAULT_WINDOW_DURATION,
|
|
GObject.ParamFlags.READWRITE
|
|
)
|
|
}
|
|
|
|
def __init__(self):
|
|
GstBase.BaseTransform.__init__(self)
|
|
self.window_duration = DEFAULT_WINDOW_DURATION
|
|
|
|
def do_get_property(self, prop):
|
|
if prop.name == 'window-duration':
|
|
return self.window_duration
|
|
else:
|
|
raise AttributeError('unknown property %s' % prop.name)
|
|
|
|
def do_set_property(self, prop, value):
|
|
if prop.name == 'window-duration':
|
|
self.window_duration = value
|
|
else:
|
|
raise AttributeError('unknown property %s' % prop.name)
|
|
|
|
def do_transform(self, inbuf, outbuf):
|
|
if not self.h:
|
|
self.h, = self.ax.plot(np.array(self.ringbuffer),
|
|
lw=0.5,
|
|
color='k',
|
|
path_effects=[pe.Stroke(linewidth=1.0,
|
|
foreground='g'),
|
|
pe.Normal()])
|
|
else:
|
|
self.h.set_ydata(np.array(self.ringbuffer))
|
|
|
|
self.fig.canvas.restore_region(self.background)
|
|
self.ax.draw_artist(self.h)
|
|
self.fig.canvas.blit(self.ax.bbox)
|
|
|
|
s = self.agg.tostring_argb()
|
|
|
|
outbuf.fill(0, s)
|
|
outbuf.pts = self.next_time
|
|
outbuf.duration = self.frame_duration
|
|
|
|
self.next_time += self.frame_duration
|
|
|
|
return Gst.FlowReturn.OK
|
|
|
|
def __append(self, data):
|
|
arr = np.array(data)
|
|
end = self.thinning_factor * int(len(arr) / self.thinning_factor)
|
|
arr = np.mean(arr[:end].reshape(-1, self.thinning_factor), 1)
|
|
self.ringbuffer.extend(arr)
|
|
|
|
def do_generate_output(self):
|
|
inbuf = self.queued_buf
|
|
_, info = inbuf.map(Gst.MapFlags.READ)
|
|
res, data = self.converter.convert(GstAudio.AudioConverterFlags.NONE,
|
|
info.data)
|
|
data = memoryview(data).cast('i')
|
|
|
|
nsamples = len(data) - self.buf_offset
|
|
|
|
if nsamples == 0:
|
|
self.buf_offset = 0
|
|
inbuf.unmap(info)
|
|
return Gst.FlowReturn.OK, None
|
|
|
|
if self.cur_offset + nsamples < self.next_offset:
|
|
self.__append(data[self.buf_offset:])
|
|
self.buf_offset = 0
|
|
self.cur_offset += nsamples
|
|
inbuf.unmap(info)
|
|
return Gst.FlowReturn.OK, None
|
|
|
|
consumed = self.next_offset - self.cur_offset
|
|
|
|
self.__append(data[self.buf_offset:self.buf_offset + consumed])
|
|
inbuf.unmap(info)
|
|
|
|
_, outbuf = GstBase.BaseTransform.do_prepare_output_buffer(self, inbuf)
|
|
|
|
ret = self.do_transform(inbuf, outbuf)
|
|
|
|
self.next_offset += self.samplesperbuffer
|
|
|
|
self.cur_offset += consumed
|
|
self.buf_offset += consumed
|
|
|
|
return ret, outbuf
|
|
|
|
def do_transform_caps(self, direction, caps, filter_):
|
|
if direction == Gst.PadDirection.SRC:
|
|
res = ICAPS
|
|
else:
|
|
res = OCAPS
|
|
|
|
if filter_:
|
|
res = res.intersect(filter_)
|
|
|
|
return res
|
|
|
|
def do_fixate_caps(self, direction, caps, othercaps):
|
|
if direction == Gst.PadDirection.SRC:
|
|
return othercaps.fixate()
|
|
else:
|
|
so = othercaps.get_structure(0).copy()
|
|
so.fixate_field_nearest_fraction("framerate",
|
|
DEFAULT_FRAMERATE_NUM,
|
|
DEFAULT_FRAMERATE_DENOM)
|
|
so.fixate_field_nearest_int("width", DEFAULT_WIDTH)
|
|
so.fixate_field_nearest_int("height", DEFAULT_HEIGHT)
|
|
ret = Gst.Caps.new_empty()
|
|
ret.append_structure(so)
|
|
return ret.fixate()
|
|
|
|
def do_set_caps(self, icaps, ocaps):
|
|
in_info = GstAudio.AudioInfo()
|
|
in_info.from_caps(icaps)
|
|
out_info = GstVideo.VideoInfo()
|
|
out_info.from_caps(ocaps)
|
|
|
|
self.convert_info = GstAudio.AudioInfo()
|
|
self.convert_info.set_format(GstAudio.AudioFormat.S32,
|
|
in_info.rate,
|
|
in_info.channels,
|
|
in_info.position)
|
|
self.converter = GstAudio.AudioConverter.new(GstAudio.AudioConverterFlags.NONE,
|
|
in_info,
|
|
self.convert_info,
|
|
None)
|
|
|
|
self.fig = plt.figure()
|
|
dpi = self.fig.get_dpi()
|
|
self.fig.patch.set_alpha(0.3)
|
|
self.fig.set_size_inches(out_info.width / float(dpi),
|
|
out_info.height / float(dpi))
|
|
self.ax = plt.Axes(self.fig, [0., 0., 1., 1.])
|
|
self.fig.add_axes(self.ax)
|
|
self.ax.set_axis_off()
|
|
self.ax.set_ylim((GLib.MININT, GLib.MAXINT))
|
|
self.agg = self.fig.canvas.switch_backends(FigureCanvasAgg)
|
|
self.h = None
|
|
|
|
samplesperwindow = int(in_info.rate * in_info.channels * self.window_duration)
|
|
self.thinning_factor = max(int(samplesperwindow / out_info.width - 1), 1)
|
|
|
|
cap = int(samplesperwindow / self.thinning_factor)
|
|
self.ax.set_xlim([0, cap])
|
|
self.ringbuffer = RingBuffer(capacity=cap)
|
|
self.ringbuffer.extend([0.0] * cap)
|
|
self.frame_duration = Gst.util_uint64_scale_int(Gst.SECOND,
|
|
out_info.fps_d,
|
|
out_info.fps_n)
|
|
self.next_time = self.frame_duration
|
|
|
|
self.agg.draw()
|
|
self.background = self.fig.canvas.copy_from_bbox(self.ax.bbox)
|
|
|
|
self.samplesperbuffer = Gst.util_uint64_scale_int(in_info.rate * in_info.channels,
|
|
out_info.fps_d,
|
|
out_info.fps_n)
|
|
self.next_offset = self.samplesperbuffer
|
|
self.cur_offset = 0
|
|
self.buf_offset = 0
|
|
|
|
return True
|
|
|
|
GObject.type_register(AudioPlotFilter)
|
|
__gstelementfactory__ = ("audioplot", Gst.Rank.NONE, AudioPlotFilter)
|