mirror of
https://gitlab.freedesktop.org/gstreamer/gstreamer.git
synced 2025-01-10 17:35:59 +00:00
286 lines
9.7 KiB
Python
286 lines
9.7 KiB
Python
|
# -*- Mode: Python -*-
|
||
|
# vi:si:et:sw=4:sts=4:ts=4
|
||
|
#
|
||
|
# GStreamer python bindings
|
||
|
# Copyright (C) 2005 Thomas Vander Stichele <thomas at apestaart dot org>
|
||
|
|
||
|
# This library is free software; you can redistribute it and/or
|
||
|
# modify it under the terms of the GNU Lesser General Public
|
||
|
# License as published by the Free Software Foundation; either
|
||
|
# version 2.1 of the License, or (at your option) any later version.
|
||
|
#
|
||
|
# This library is distributed in the hope that it will be useful,
|
||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
||
|
# Lesser General Public License for more details.
|
||
|
#
|
||
|
# You should have received a copy of the GNU Lesser General Public
|
||
|
# License along with this library; if not, write to the Free Software
|
||
|
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
|
||
|
|
||
|
import os
|
||
|
import sys
|
||
|
import math
|
||
|
|
||
|
import gobject
|
||
|
import pygst
|
||
|
pygst.require('0.10')
|
||
|
import gst
|
||
|
|
||
|
import utils
|
||
|
from pygobject import gsignal
|
||
|
|
||
|
import sources
|
||
|
from sources import EOS, ERROR, UNKNOWN_TYPE, WRONG_TYPE
|
||
|
|
||
|
class Leveller(gst.Pipeline):
|
||
|
"""
|
||
|
I am a pipeline that calculates RMS values and mix-in/out points.
|
||
|
I will signal 'done' when I'm done scanning the file, with return value
|
||
|
EOS, ERROR, UNKNOWN_TYPE, or WRONG_TYPE from gst.extend.sources
|
||
|
"""
|
||
|
|
||
|
gsignal('done', str)
|
||
|
|
||
|
def __init__(self, filename, threshold=-9.0):
|
||
|
gst.Pipeline.__init__(self)
|
||
|
|
||
|
self._filename = filename
|
||
|
|
||
|
self._thresholddB = threshold
|
||
|
self._threshold = math.pow(10, self._thresholddB / 10.0)
|
||
|
|
||
|
self._source = sources.AudioSource(filename)
|
||
|
self._source.connect('done', self._done_cb)
|
||
|
|
||
|
self._level = gst.element_factory_make("level")
|
||
|
|
||
|
self._fakesink = gst.element_factory_make("fakesink")
|
||
|
|
||
|
self.add(self._source, self._level, self._fakesink)
|
||
|
self._source.connect("pad-added", self._sourcePadAddedCb)
|
||
|
self._level.link(self._fakesink)
|
||
|
|
||
|
# temporary values for each timepoint
|
||
|
self._rmsdB = {} # hash of channel, rmsdB value
|
||
|
self._peakdB = 0.0 # highest value over all channels for this time
|
||
|
|
||
|
# results over the whole file
|
||
|
self._meansquaresums = [] # list of time -> mean square sum value
|
||
|
self._peaksdB = [] # list of time -> peak value
|
||
|
|
||
|
self._lasttime = 0
|
||
|
|
||
|
# will be set when done
|
||
|
self.mixin = 0
|
||
|
self.mixout = 0
|
||
|
self.length = 0
|
||
|
self.rms = 0.0
|
||
|
self.rmsdB = 0.0
|
||
|
|
||
|
def _sourcePadAddedCb(self, source, pad):
|
||
|
self._source.link(self._level)
|
||
|
|
||
|
def do_handle_message(self, message):
|
||
|
self.debug("got message %r" % message)
|
||
|
if (message.type == gst.MESSAGE_ELEMENT) and (message.src == self._level):
|
||
|
struc = message.structure
|
||
|
endtime = struc["endtime"]
|
||
|
rmss = struc["rms"]
|
||
|
peaks = struc["peak"]
|
||
|
decays = struc["decay"]
|
||
|
infos = zip(rmss, peaks, decays)
|
||
|
channid = 0
|
||
|
for rms,peak,decay in infos:
|
||
|
self._level_cb(message.src, endtime, channid, rms, peak, decay)
|
||
|
channid += 1
|
||
|
elif message.type == gst.MESSAGE_EOS:
|
||
|
self._eos_cb(message.src)
|
||
|
# chaining up
|
||
|
gst.Pipeline.do_handle_message(self, message)
|
||
|
|
||
|
def _level_cb(self, element, time, channel, rmsdB, peakdB, decaydB):
|
||
|
# rms is being signalled in dB
|
||
|
# FIXME: maybe level should have a way of signalling actual values
|
||
|
# signals are received in order, so I should get each channel one
|
||
|
# by one
|
||
|
if time > self._lasttime and self._lasttime > 0:
|
||
|
# we have a new time point, so calculate stuff for the old block
|
||
|
meansquaresum = 0.0
|
||
|
for i in self._rmsdB.keys():
|
||
|
meansquaresum += math.pow(10, self._rmsdB[i] / 10.0)
|
||
|
# average over channels
|
||
|
meansquaresum /= len(self._rmsdB.keys())
|
||
|
try:
|
||
|
rmsdBstr = str(10 * math.log10(meansquaresum))
|
||
|
except OverflowError:
|
||
|
rmsdBstr = "(-inf)"
|
||
|
gst.log("meansquaresum %f (%s dB)" % (meansquaresum, rmsdBstr))
|
||
|
|
||
|
# update values
|
||
|
self._peaksdB.append((self._lasttime, peakdB))
|
||
|
self._meansquaresums.append((self._lasttime, meansquaresum))
|
||
|
self._rmsdB = {}
|
||
|
self._peakdB = 0.0
|
||
|
|
||
|
# store the current values for later processing
|
||
|
gst.log("time %s, channel %d, rmsdB %f" % (gst.TIME_ARGS(time), channel, rmsdB))
|
||
|
self._lasttime = time
|
||
|
self._rmsdB[channel] = rmsdB
|
||
|
if peakdB > self._peakdB:
|
||
|
self._peakdB = peakdB
|
||
|
|
||
|
def _done_cb(self, source, reason):
|
||
|
gst.debug("done, reason %s" % reason)
|
||
|
# we ignore eos because we want the whole pipeline to eos
|
||
|
if reason == EOS:
|
||
|
return
|
||
|
self.emit('done', reason)
|
||
|
|
||
|
def _eos_cb(self, source):
|
||
|
gst.debug("eos, start calcing")
|
||
|
|
||
|
# get the highest peak RMS for this track
|
||
|
highestdB = self._peaksdB[0][1]
|
||
|
|
||
|
for (time, peakdB) in self._peaksdB:
|
||
|
if peakdB > highestdB:
|
||
|
highestdB = peakdB
|
||
|
gst.debug("highest peak(dB): %f" % highestdB)
|
||
|
|
||
|
# get the length
|
||
|
(self.length, peakdB) = self._peaksdB[-1]
|
||
|
|
||
|
# find the mix in point
|
||
|
for (time, peakdB) in self._peaksdB:
|
||
|
gst.log("time %s, peakdB %f" % (gst.TIME_ARGS(time), peakdB))
|
||
|
if peakdB > self._thresholddB + highestdB:
|
||
|
gst.debug("found mix-in point of %f dB at %s" % (
|
||
|
peakdB, gst.TIME_ARGS(time)))
|
||
|
self.mixin = time
|
||
|
break
|
||
|
|
||
|
# reverse and find out point
|
||
|
self._peaksdB.reverse()
|
||
|
found = None
|
||
|
for (time, peakdB) in self._peaksdB:
|
||
|
if found:
|
||
|
self.mixout = time
|
||
|
gst.debug("found mix-out point of %f dB right before %s" % (
|
||
|
found, gst.TIME_ARGS(time)))
|
||
|
break
|
||
|
|
||
|
if peakdB > self._thresholddB + highestdB:
|
||
|
found = peakdB
|
||
|
|
||
|
# now calculate RMS between these two points
|
||
|
weightedsquaresums = 0.0
|
||
|
lasttime = self.mixin
|
||
|
for (time, meansquaresum) in self._meansquaresums:
|
||
|
if time <= self.mixin:
|
||
|
continue
|
||
|
|
||
|
delta = time - lasttime
|
||
|
weightedsquaresums += meansquaresum * delta
|
||
|
gst.log("added MSS %f over time %s at time %s, now %f" % (
|
||
|
meansquaresum, gst.TIME_ARGS(delta),
|
||
|
gst.TIME_ARGS(time), weightedsquaresums))
|
||
|
|
||
|
lasttime = time
|
||
|
|
||
|
if time > self.mixout:
|
||
|
break
|
||
|
|
||
|
# calculate
|
||
|
try:
|
||
|
ms = weightedsquaresums / (self.mixout - self.mixin)
|
||
|
except ZeroDivisionError:
|
||
|
# this is possible when, for example, the whole sound file is
|
||
|
# empty
|
||
|
gst.warning('ZeroDivisionError on %s, mixin %s, mixout %s' % (
|
||
|
self._filename, gst.TIME_ARGS(self.mixin),
|
||
|
gst.TIME_ARGS(self.mixout)))
|
||
|
self.emit('done', WRONG_TYPE)
|
||
|
return
|
||
|
|
||
|
self.rms = math.sqrt(ms)
|
||
|
self.rmsdB = 10 * math.log10(ms)
|
||
|
|
||
|
self.emit('done', EOS)
|
||
|
|
||
|
def start(self):
|
||
|
gst.debug("Setting to PLAYING")
|
||
|
self.set_state(gst.STATE_PLAYING)
|
||
|
gst.debug("Set to PLAYING")
|
||
|
|
||
|
# FIXME: we might want to do this ourselves automatically ?
|
||
|
def stop(self):
|
||
|
"""
|
||
|
Stop the leveller, freeing all resources.
|
||
|
Call after the leveller emitted 'done' to clean up.
|
||
|
"""
|
||
|
gst.debug("Setting to NULL")
|
||
|
self.set_state(gst.STATE_NULL)
|
||
|
gst.debug("Set to NULL")
|
||
|
utils.gc_collect('Leveller.stop()')
|
||
|
|
||
|
def clean(self):
|
||
|
# clean ourselves up completely
|
||
|
self.stop()
|
||
|
# let's be ghetto and clean out our bin manually
|
||
|
self.remove(self._source)
|
||
|
self.remove(self._level)
|
||
|
self.remove(self._fakesink)
|
||
|
gst.debug("Emptied myself")
|
||
|
self._source.clean()
|
||
|
utils.gc_collect('Leveller.clean() cleaned up source')
|
||
|
self._source = None
|
||
|
self._fakesink = None
|
||
|
self._level = None
|
||
|
utils.gc_collect('Leveller.clean() done')
|
||
|
|
||
|
gobject.type_register(Leveller)
|
||
|
|
||
|
if __name__ == "__main__":
|
||
|
main = gobject.MainLoop()
|
||
|
|
||
|
try:
|
||
|
leveller = Leveller(sys.argv[1])
|
||
|
except IndexError:
|
||
|
sys.stderr.write("Please give a file to calculate level of\n")
|
||
|
sys.exit(1)
|
||
|
|
||
|
print "Starting"
|
||
|
bus = leveller.get_bus()
|
||
|
bus.add_signal_watch()
|
||
|
dontstop = True
|
||
|
|
||
|
leveller.set_state(gst.STATE_PLAYING)
|
||
|
|
||
|
while dontstop:
|
||
|
message = bus.poll(gst.MESSAGE_ANY, gst.SECOND)
|
||
|
if message:
|
||
|
gst.debug("got message from poll:%s/%r" % (message.type, message))
|
||
|
else:
|
||
|
gst.debug("got NOTHING from poll")
|
||
|
if message:
|
||
|
if message.type == gst.MESSAGE_EOS:
|
||
|
print "in: %s, out: %s, length: %s" % (gst.TIME_ARGS(leveller.mixin),
|
||
|
gst.TIME_ARGS(leveller.mixout),
|
||
|
gst.TIME_ARGS(leveller.length))
|
||
|
print "rms: %f, %f dB" % (leveller.rms, leveller.rmsdB)
|
||
|
dontstop = False
|
||
|
elif message.type == gst.MESSAGE_ERROR:
|
||
|
error,debug = message.parse_error()
|
||
|
print "ERROR[%s] %s" % (error.domain, error.message)
|
||
|
dontstop = False
|
||
|
|
||
|
leveller.stop()
|
||
|
leveller.clean()
|
||
|
|
||
|
gst.debug('deleting leveller, verify objects are freed')
|
||
|
utils.gc_collect('quit main loop')
|
||
|
del leveller
|
||
|
utils.gc_collect('deleted leveller')
|
||
|
gst.debug('stopping forever')
|