mirror of
https://gitlab.freedesktop.org/gstreamer/gstreamer.git
synced 2024-12-20 23:36:38 +00:00
193 lines
5.7 KiB
Python
193 lines
5.7 KiB
Python
'''
|
|
Element that generates a sine audio wave with the specified frequency
|
|
|
|
Requires numpy
|
|
|
|
Example pipeline:
|
|
|
|
gst-launch-1.0 py_audiotestsrc ! autoaudiosink
|
|
'''
|
|
|
|
import gi
|
|
|
|
gi.require_version('Gst', '1.0')
|
|
gi.require_version('GstBase', '1.0')
|
|
gi.require_version('GstAudio', '1.0')
|
|
|
|
from gi.repository import Gst, GLib, GObject, GstBase, GstAudio
|
|
|
|
try:
|
|
import numpy as np
|
|
except ImportError:
|
|
Gst.error('py_audiotestsrc requires numpy')
|
|
raise
|
|
|
|
OCAPS = Gst.Caps.from_string (
|
|
'audio/x-raw, format=F32LE, layout=interleaved, rate=44100, channels=2')
|
|
|
|
SAMPLESPERBUFFER = 1024
|
|
|
|
DEFAULT_FREQ = 440
|
|
DEFAULT_VOLUME = 0.8
|
|
DEFAULT_MUTE = False
|
|
DEFAULT_IS_LIVE = False
|
|
|
|
class AudioTestSrc(GstBase.BaseSrc):
|
|
__gstmetadata__ = ('CustomSrc','Src', \
|
|
'Custom test src element', 'Mathieu Duponchelle')
|
|
|
|
__gproperties__ = {
|
|
"freq": (int,
|
|
"Frequency",
|
|
"Frequency of test signal",
|
|
1,
|
|
GLib.MAXINT,
|
|
DEFAULT_FREQ,
|
|
GObject.ParamFlags.READWRITE
|
|
),
|
|
"volume": (float,
|
|
"Volume",
|
|
"Volume of test signal",
|
|
0.0,
|
|
1.0,
|
|
DEFAULT_VOLUME,
|
|
GObject.ParamFlags.READWRITE
|
|
),
|
|
"mute": (bool,
|
|
"Mute",
|
|
"Mute the test signal",
|
|
DEFAULT_MUTE,
|
|
GObject.ParamFlags.READWRITE
|
|
),
|
|
"is-live": (bool,
|
|
"Is live",
|
|
"Whether to act as a live source",
|
|
DEFAULT_IS_LIVE,
|
|
GObject.ParamFlags.READWRITE
|
|
),
|
|
}
|
|
|
|
__gsttemplates__ = Gst.PadTemplate.new("src",
|
|
Gst.PadDirection.SRC,
|
|
Gst.PadPresence.ALWAYS,
|
|
OCAPS)
|
|
|
|
def __init__(self):
|
|
GstBase.BaseSrc.__init__(self)
|
|
self.info = GstAudio.AudioInfo()
|
|
|
|
self.freq = DEFAULT_FREQ
|
|
self.volume = DEFAULT_VOLUME
|
|
self.mute = DEFAULT_MUTE
|
|
|
|
self.set_live(DEFAULT_IS_LIVE)
|
|
self.set_format(Gst.Format.TIME)
|
|
|
|
def do_set_caps(self, caps):
|
|
self.info.from_caps(caps)
|
|
self.set_blocksize(self.info.bpf * SAMPLESPERBUFFER)
|
|
return True
|
|
|
|
def do_get_property(self, prop):
|
|
if prop.name == 'freq':
|
|
return self.freq
|
|
elif prop.name == 'volume':
|
|
return self.volume
|
|
elif prop.name == 'mute':
|
|
return self.mute
|
|
elif prop.name == 'is-live':
|
|
return self.is_live
|
|
else:
|
|
raise AttributeError('unknown property %s' % prop.name)
|
|
|
|
def do_set_property(self, prop, value):
|
|
if prop.name == 'freq':
|
|
self.freq = value
|
|
elif prop.name == 'volume':
|
|
self.volume = value
|
|
elif prop.name == 'mute':
|
|
self.mute = value
|
|
elif prop.name == 'is-live':
|
|
self.set_live(value)
|
|
else:
|
|
raise AttributeError('unknown property %s' % prop.name)
|
|
|
|
def do_start (self):
|
|
self.next_sample = 0
|
|
self.next_byte = 0
|
|
self.next_time = 0
|
|
self.accumulator = 0
|
|
self.generate_samples_per_buffer = SAMPLESPERBUFFER
|
|
|
|
return True
|
|
|
|
def do_gst_base_src_query(self, query):
|
|
if query.type == Gst.QueryType.LATENCY:
|
|
latency = Gst.util_uint64_scale_int(self.generate_samples_per_buffer,
|
|
Gst.SECOND, self.info.rate)
|
|
is_live = self.is_live
|
|
query.set_latency(is_live, latency, Gst.CLOCK_TIME_NONE)
|
|
res = True
|
|
else:
|
|
res = GstBase.BaseSrc.do_query(self, query)
|
|
return res
|
|
|
|
def do_get_times(self, buf):
|
|
end = 0
|
|
start = 0
|
|
if self.is_live:
|
|
ts = buf.pts
|
|
if ts != Gst.CLOCK_TIME_NONE:
|
|
duration = buf.duration
|
|
if duration != Gst.CLOCK_TIME_NONE:
|
|
end = ts + duration
|
|
start = ts
|
|
else:
|
|
start = Gst.CLOCK_TIME_NONE
|
|
end = Gst.CLOCK_TIME_NONE
|
|
|
|
return start, end
|
|
|
|
def do_fill(self, offset, length, buf):
|
|
if length == -1:
|
|
samples = SAMPLESPERBUFFER
|
|
else:
|
|
samples = int(length / self.info.bpf)
|
|
|
|
self.generate_samples_per_buffer = samples
|
|
|
|
bytes_ = samples * self.info.bpf
|
|
|
|
next_sample = self.next_sample + samples
|
|
next_byte = self.next_byte + bytes_
|
|
next_time = Gst.util_uint64_scale_int(next_sample, Gst.SECOND, self.info.rate)
|
|
|
|
try:
|
|
with buf.map(Gst.MapFlags.WRITE) as info:
|
|
array = np.ndarray(shape = self.info.channels * samples, dtype = np.float32, buffer = info.data)
|
|
if not self.mute:
|
|
r = np.repeat(np.arange(self.accumulator, self.accumulator + samples),
|
|
self.info.channels)
|
|
np.sin(2 * np.pi * r * self.freq / self.info.rate, out=array)
|
|
array *= self.volume
|
|
else:
|
|
array[:] = 0
|
|
except Exception as e:
|
|
Gst.error("Mapping error: %s" % e)
|
|
return Gst.FlowReturn.ERROR
|
|
|
|
buf.offset = self.next_sample
|
|
buf.offset_end = next_sample
|
|
buf.pts = self.next_time
|
|
buf.duration = next_time - self.next_time
|
|
|
|
self.next_time = next_time
|
|
self.next_sample = next_sample
|
|
self.next_byte = next_byte
|
|
self.accumulator += samples
|
|
self.accumulator %= self.info.rate / self.freq
|
|
|
|
return (Gst.FlowReturn.OK, buf)
|
|
|
|
|
|
__gstelementfactory__ = ("py_audiotestsrc", Gst.Rank.NONE, AudioTestSrc)
|