diff --git a/ChangeLog b/ChangeLog index 403410b2c0..24bc9580f7 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,11 @@ +2006-09-12 Edward Hervey + + * gst/extend/jukebox.py: + * gst/extend/leveller.py: + * gst/extend/sources.py: + Revival of the jukebox (and leveller) using 0.10 and gnonlin. + Still needs a bit of love, but functionnal enough. + 2006-09-10 Edward Hervey Patch by: Rene Stadler diff --git a/gst/extend/jukebox.py b/gst/extend/jukebox.py new file mode 100644 index 0000000000..3dbd3b9246 --- /dev/null +++ b/gst/extend/jukebox.py @@ -0,0 +1,354 @@ +#!/usr/bin/env python +# -*- Mode: Python -*- +# vi:si:et:sw=4:sts=4:ts=4 +# +# GStreamer python bindings +# Copyright (C) 2005 Edward Hervey +# Copyright (C) 2005 Thomas Vander Stichele + +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +import os +import sys +import pickle +import random as rand + +import gobject +import pygst +pygst.require('0.10') +import gst + +import utils +from pygobject import gsignal +import sources +from leveller import Leveller + +class Jukebox(gst.Bin): + gsignal('done', str) + gsignal('prerolled') # emitted when at least 2 sources are ready + gsignal('changed', str, gobject.TYPE_UINT64) # clocktime, filename + gsignal('looped') + + def __init__(self, files, rms=0.2, loops=0, random=False, + caps="audio/x-raw-int,channels=2,rate=44100", + picklepath='level.pck'): + # with pygtk 2.4 this call is needed for the gsignal to work + self.__gobject_init__() + + self._target_rms = rms + self._loopsleft = loops + self._loopsdone = 0 + self._random = random + self._picklepath = picklepath + self._caps = gst.caps_from_string(caps) + self._files = files[:] # copy + self._levels = {} # filename -> rms, mixin, mixout, length + self._prerolled = False + self._playing = False + self._scani = 0 # index into self._files for scanning + self._playi = 0 # index into self._files for playing + + self._lastadded = None # last file added to composition + self._lastposition = long(0) # last position where file was added + + if not len(files) > 1: + raise TypeError, 'Must have at least 2 files' + + self._composition = gst.element_factory_make("gnlcomposition") + self._composition.connect('pad-added', self._composition_pad_added_cb) + self.add(self._composition) + + self._srcpad = None + + # load our pickle if it exists + if os.path.exists(self._picklepath): + file = open(self._picklepath) + self._levels = pickle.load(file) + file.close() + + # randomize our list if asked for + if self._random: + self._files = rand.sample(self._files, len(self._files)) + + + ## public API + + def preroll(self): + # scan the first few files and start playing + gst.debug("starting jukebox prerolling") + self._scan() + + def start(self): + ## + ## FIXME : THIS SHOULD'T BE NEEDED ! + ## USE STATE CHANGES INSTEAD + ## + if not self._prerolled: + raise Exception, "baby" + self.set_state(gst.STATE_PAUSED) + + + ## Scanning private methods + + def _scan(self): + # start a leveller for a new _toscan file + if self._scani >= len(self._files): + gst.debug("We're done scanning !") + return + + file = self._files[self._scani] + self._scani += 1 + + if file in self._levels.keys(): + gst.debug("already did file %s" % file) + self._check_prerolled() + gobject.timeout_add(0, self._scan) + return + + gst.debug("creating leveller for %s" % file) + leveller = Leveller(file) + leveller.connect('done', self._leveller_done_cb, file) + gobject.timeout_add(0, leveller.start) + ##gobject.idle_add(leveller.iterate) + + def _leveller_done_cb(self, l, reason, file): + if reason != sources.EOS: + gst.debug("Error: %s" % reason) + return + + gst.debug("in: %s, out: %s" % (gst.TIME_ARGS(l.mixin), + gst.TIME_ARGS(l.mixout))) + gst.debug("rms: %f, %f dB" % (l.rms, l.rmsdB)) + + # store infos + self._levels[file] = (l.rms, l.mixin, l.mixout, l.length) + + gst.debug("writing level pickle") + file = open(self._picklepath, "w") + pickle.dump(self._levels, file) + file.close() + + self._check_prerolled() + self._scan() + + # clean up leveller after this handler + gobject.timeout_add(0, l.clean) + + + ## GnlSource-related methods + + def _new_gnl_source(self, location, start): + """ + Creates a new GnlSource containing an AudioSource with the following + properties correctly set: + _ volume level + _ priority + _ duration + The start position MUST be given + """ + if not self._levels[location]: + return None + self.debug("Creating new GnlSource at %s for %s" % (gst.TIME_ARGS(start), location)) + idx = self._files.index(location) + self._loopsdone * len(self._files) + rms, mixin, mixout, duration = self._levels[location] + gnls = gst.element_factory_make("gnlsource", "source-%d-%s" % (idx, location)) + src = sources.AudioSource(location) + gnls.add(src) + + # set volume + level = 1.0 + if rms > self._target_rms: + level = self._target_rms / rms + gst.debug('setting volume of %f' % level) + else: + gst.debug('not going to go above 1.0 level') + src.set_volume(level) + + # set proper position/duration/priority in composition + gnls.props.priority = (2 * self._loopsdone) + 1 + (idx % 2) + gnls.props.start = long(start) + gnls.props.duration = long(duration) + gnls.props.media_duration = long(duration) + gnls.props.media_start = long(0) + + return gnls + + def _new_mixer(self, start, duration): + gnlo = gst.element_factory_make("gnloperation") + ad = gst.element_factory_make("adder") + gnlo.add(ad) + gnlo.props.sinks = 2 + gnlo.props.start = start + gnlo.props.duration = duration + gnlo.props.priority = 0 + + return gnlo + + def _append_file(self, location): + """ + Appends the given file to the composition, along with the proper mixer effect + """ + self.debug("location:%s" % location) + start = self._lastposition + if self._lastadded: + start += self._levels[self._lastadded][2] + start -= self._levels[location][1] + + gnls = self._new_gnl_source(location, start) + self._composition.add(gnls) + + if self._lastadded: + # create the mixer + duration = self._levels[self._lastadded][3] - self._levels[self._lastadded][2] + self._levels[location][1] + mixer = self._new_mixer(start, duration) + self._composition.add(mixer) + + self._lastposition = start + self._lastadded = location + + self.debug("lastposition:%s , lastadded:%s" % (gst.TIME_ARGS(self._lastposition), + self._lastadded)) + + def _check_prerolled(self): + gst.debug("_check_prerolled: index: scan %d, play %d" % ( + self._scani, self._playi)) + if not self._prerolled and self._scani > self._playi + 1: + self._prerolled = True + # add initial sources here + self._append_file(self._files[0]) + self._append_file(self._files[1]) + self.debug("now prerolled and ready to play") + self.emit('prerolled') + + + def _emit_changed(self, file, when): + print "emitting changed for %s at %r" % (file, when) + self.emit('changed', file, when) + + def _source_clean(self, source): + source.set_state(gst.STATE_NULL) + self.remove(source) + source.clean() + + ## composition callbacks + + def _composition_pad_added_cb(self, comp, pad): + if self._srcpad: + return + self.debug("Ghosting source pad %s" % pad) + self._srcpad = gst.GhostPad("src", pad) + self.add_pad(self._srcpad) + + ## gst.Bin/Element virtual methods + + def do_handle_message(self, message): + self.debug("got message %s / %s / %r" % (message.src.get_name(), message.type.first_value_name, message)) + + # chaining up + gst.Bin.do_handle_message(self, message) + + def do_state_change(self, transition): + if not self._prerolled: + gst.error("Call Jukebox.preroll() before!") + return gst.STATE_CHANGE_FAILURE + # chaining up + return gst.Bin.do_state_change(self, message) + +gobject.type_register(Jukebox) + +# helper functions +def _find_elements_recurse(element): + if not isinstance(element, gst.Bin): + return [element, ] + l = [] + for e in element.elements(): + l.extend(_find_elements_recurse(e)) + return l + +def _find_unconnected_pad(bin, direction): + for e in _find_elements_recurse(bin): + for p in e.pads(): + if p.get_direction() == direction and not p.get_peer(): + return p + + return None + +# run us to test +if __name__ == "__main__": + main = gobject.MainLoop() + pipeline = gst.Pipeline('jukebox') + list = open(sys.argv[1]).read().rstrip().split('\n') + print list + #source = Jukebox(list, random=True, loops=-1) + source = Jukebox(list, random=True, loops=1) + + def _jukebox_prerolled_cb(jukebox): + print "prerolled" + _start() + + def _jukebox_changed_cb(jukebox, filename, when): + print "changed file to %s at %s" % (filename, float(when) / gst.TIME_ARGS(gst.SECOND)) + + def _jukebox_looped_cb(jukebox): + print "jukebox looped" + + def _start(): + source.start() + print "setting pipeline to PLAYING" + pipeline.set_state(gst.STATE_PLAYING) + print "set pipeline to PLAYING" + + def jukebox_pad_added(comp, pad, sinkpad): + pad.link(sinkpad) + + def jukebox_message(bus, message): + if message.type == gst.MESSAGE_ERROR: + print "Error: %s" % message.parse_error() + main.quit() + elif message.type == gst.MESSAGE_EOS: + print "done" + main.quit() + + source.connect('prerolled', _jukebox_prerolled_cb) + source.connect('changed', _jukebox_changed_cb) + source.connect('looped', _jukebox_looped_cb) + source.preroll() + pipeline.add(source) + + bus = pipeline.get_bus() + bus.add_signal_watch() + bus.connect("message", jukebox_message) + + p = "alsasink" + if len(sys.argv) > 2: + p = " ".join(sys.argv[2:]) + + print "parsing output pipeline %s" % p + sinkbin = gst.parse_launch("bin.( %s )" % p) + pipeline.add(sinkbin) + apad = _find_unconnected_pad(sinkbin, gst.PAD_SINK) + if not apad: + raise TypeError, "No unconnected sink pad found in bin %r" % sinkbin + sinkpad = gst.GhostPad("sink", apad) + sinkbin.add_pad(sinkpad) + source.connect('pad-added', jukebox_pad_added, sinkpad) + + print "Going into main loop" + sys.stdout.flush() + main.run() + print "Left main loop" + sys.stdout.flush() + + pipeline.set_state(gst.STATE_NULL) diff --git a/gst/extend/leveller.py b/gst/extend/leveller.py new file mode 100644 index 0000000000..5fd759715a --- /dev/null +++ b/gst/extend/leveller.py @@ -0,0 +1,285 @@ +# -*- Mode: Python -*- +# vi:si:et:sw=4:sts=4:ts=4 +# +# GStreamer python bindings +# Copyright (C) 2005 Thomas Vander Stichele + +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +import os +import sys +import math + +import gobject +import pygst +pygst.require('0.10') +import gst + +import utils +from pygobject import gsignal + +import sources +from sources import EOS, ERROR, UNKNOWN_TYPE, WRONG_TYPE + +class Leveller(gst.Pipeline): + """ + I am a pipeline that calculates RMS values and mix-in/out points. + I will signal 'done' when I'm done scanning the file, with return value + EOS, ERROR, UNKNOWN_TYPE, or WRONG_TYPE from gst.extend.sources + """ + + gsignal('done', str) + + def __init__(self, filename, threshold=-9.0): + gst.Pipeline.__init__(self) + + self._filename = filename + + self._thresholddB = threshold + self._threshold = math.pow(10, self._thresholddB / 10.0) + + self._source = sources.AudioSource(filename) + self._source.connect('done', self._done_cb) + + self._level = gst.element_factory_make("level") + + self._fakesink = gst.element_factory_make("fakesink") + + self.add(self._source, self._level, self._fakesink) + self._source.connect("pad-added", self._sourcePadAddedCb) + self._level.link(self._fakesink) + + # temporary values for each timepoint + self._rmsdB = {} # hash of channel, rmsdB value + self._peakdB = 0.0 # highest value over all channels for this time + + # results over the whole file + self._meansquaresums = [] # list of time -> mean square sum value + self._peaksdB = [] # list of time -> peak value + + self._lasttime = 0 + + # will be set when done + self.mixin = 0 + self.mixout = 0 + self.length = 0 + self.rms = 0.0 + self.rmsdB = 0.0 + + def _sourcePadAddedCb(self, source, pad): + self._source.link(self._level) + + def do_handle_message(self, message): + self.debug("got message %r" % message) + if (message.type == gst.MESSAGE_ELEMENT) and (message.src == self._level): + struc = message.structure + endtime = struc["endtime"] + rmss = struc["rms"] + peaks = struc["peak"] + decays = struc["decay"] + infos = zip(rmss, peaks, decays) + channid = 0 + for rms,peak,decay in infos: + self._level_cb(message.src, endtime, channid, rms, peak, decay) + channid += 1 + elif message.type == gst.MESSAGE_EOS: + self._eos_cb(message.src) + # chaining up + gst.Pipeline.do_handle_message(self, message) + + def _level_cb(self, element, time, channel, rmsdB, peakdB, decaydB): + # rms is being signalled in dB + # FIXME: maybe level should have a way of signalling actual values + # signals are received in order, so I should get each channel one + # by one + if time > self._lasttime and self._lasttime > 0: + # we have a new time point, so calculate stuff for the old block + meansquaresum = 0.0 + for i in self._rmsdB.keys(): + meansquaresum += math.pow(10, self._rmsdB[i] / 10.0) + # average over channels + meansquaresum /= len(self._rmsdB.keys()) + try: + rmsdBstr = str(10 * math.log10(meansquaresum)) + except OverflowError: + rmsdBstr = "(-inf)" + gst.log("meansquaresum %f (%s dB)" % (meansquaresum, rmsdBstr)) + + # update values + self._peaksdB.append((self._lasttime, peakdB)) + self._meansquaresums.append((self._lasttime, meansquaresum)) + self._rmsdB = {} + self._peakdB = 0.0 + + # store the current values for later processing + gst.log("time %s, channel %d, rmsdB %f" % (gst.TIME_ARGS(time), channel, rmsdB)) + self._lasttime = time + self._rmsdB[channel] = rmsdB + if peakdB > self._peakdB: + self._peakdB = peakdB + + def _done_cb(self, source, reason): + gst.debug("done, reason %s" % reason) + # we ignore eos because we want the whole pipeline to eos + if reason == EOS: + return + self.emit('done', reason) + + def _eos_cb(self, source): + gst.debug("eos, start calcing") + + # get the highest peak RMS for this track + highestdB = self._peaksdB[0][1] + + for (time, peakdB) in self._peaksdB: + if peakdB > highestdB: + highestdB = peakdB + gst.debug("highest peak(dB): %f" % highestdB) + + # get the length + (self.length, peakdB) = self._peaksdB[-1] + + # find the mix in point + for (time, peakdB) in self._peaksdB: + gst.log("time %s, peakdB %f" % (gst.TIME_ARGS(time), peakdB)) + if peakdB > self._thresholddB + highestdB: + gst.debug("found mix-in point of %f dB at %s" % ( + peakdB, gst.TIME_ARGS(time))) + self.mixin = time + break + + # reverse and find out point + self._peaksdB.reverse() + found = None + for (time, peakdB) in self._peaksdB: + if found: + self.mixout = time + gst.debug("found mix-out point of %f dB right before %s" % ( + found, gst.TIME_ARGS(time))) + break + + if peakdB > self._thresholddB + highestdB: + found = peakdB + + # now calculate RMS between these two points + weightedsquaresums = 0.0 + lasttime = self.mixin + for (time, meansquaresum) in self._meansquaresums: + if time <= self.mixin: + continue + + delta = time - lasttime + weightedsquaresums += meansquaresum * delta + gst.log("added MSS %f over time %s at time %s, now %f" % ( + meansquaresum, gst.TIME_ARGS(delta), + gst.TIME_ARGS(time), weightedsquaresums)) + + lasttime = time + + if time > self.mixout: + break + + # calculate + try: + ms = weightedsquaresums / (self.mixout - self.mixin) + except ZeroDivisionError: + # this is possible when, for example, the whole sound file is + # empty + gst.warning('ZeroDivisionError on %s, mixin %s, mixout %s' % ( + self._filename, gst.TIME_ARGS(self.mixin), + gst.TIME_ARGS(self.mixout))) + self.emit('done', WRONG_TYPE) + return + + self.rms = math.sqrt(ms) + self.rmsdB = 10 * math.log10(ms) + + self.emit('done', EOS) + + def start(self): + gst.debug("Setting to PLAYING") + self.set_state(gst.STATE_PLAYING) + gst.debug("Set to PLAYING") + + # FIXME: we might want to do this ourselves automatically ? + def stop(self): + """ + Stop the leveller, freeing all resources. + Call after the leveller emitted 'done' to clean up. + """ + gst.debug("Setting to NULL") + self.set_state(gst.STATE_NULL) + gst.debug("Set to NULL") + utils.gc_collect('Leveller.stop()') + + def clean(self): + # clean ourselves up completely + self.stop() + # let's be ghetto and clean out our bin manually + self.remove(self._source) + self.remove(self._level) + self.remove(self._fakesink) + gst.debug("Emptied myself") + self._source.clean() + utils.gc_collect('Leveller.clean() cleaned up source') + self._source = None + self._fakesink = None + self._level = None + utils.gc_collect('Leveller.clean() done') + +gobject.type_register(Leveller) + +if __name__ == "__main__": + main = gobject.MainLoop() + + try: + leveller = Leveller(sys.argv[1]) + except IndexError: + sys.stderr.write("Please give a file to calculate level of\n") + sys.exit(1) + + print "Starting" + bus = leveller.get_bus() + bus.add_signal_watch() + dontstop = True + + leveller.set_state(gst.STATE_PLAYING) + + while dontstop: + message = bus.poll(gst.MESSAGE_ANY, gst.SECOND) + if message: + gst.debug("got message from poll:%s/%r" % (message.type, message)) + else: + gst.debug("got NOTHING from poll") + if message: + if message.type == gst.MESSAGE_EOS: + print "in: %s, out: %s, length: %s" % (gst.TIME_ARGS(leveller.mixin), + gst.TIME_ARGS(leveller.mixout), + gst.TIME_ARGS(leveller.length)) + print "rms: %f, %f dB" % (leveller.rms, leveller.rmsdB) + dontstop = False + elif message.type == gst.MESSAGE_ERROR: + error,debug = message.parse_error() + print "ERROR[%s] %s" % (error.domain, error.message) + dontstop = False + + leveller.stop() + leveller.clean() + + gst.debug('deleting leveller, verify objects are freed') + utils.gc_collect('quit main loop') + del leveller + utils.gc_collect('deleted leveller') + gst.debug('stopping forever') diff --git a/gst/extend/sources.py b/gst/extend/sources.py new file mode 100644 index 0000000000..ffeaf556b1 --- /dev/null +++ b/gst/extend/sources.py @@ -0,0 +1,164 @@ +#!/usr/bin/env python +# -*- Mode: Python -*- +# vi:si:et:sw=4:sts=4:ts=4 +# +# GStreamer python bindings +# Copyright (C) 2005 Edward Hervey + +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +import os +import sys + +import gobject +import pygst +pygst.require('0.10') +import gst + +from pygobject import gsignal + +EOS = 'EOS' +ERROR = 'ERROR' +WRONG_TYPE = 'WRONG_TYPE' +UNKNOWN_TYPE = 'UNKNOWN_TYPE' + +class AudioSource(gst.Bin): + """A bin for audio sources with proper audio converters""" + + gsignal('done', str) + gsignal('prerolled') + + def __init__(self, filename, caps="audio/x-raw-int,channels=2,rate=44100"): + # with pygtk 2.4 this call is needed for the gsignal to work + gst.Bin.__init__(self) + + self.filename = filename + self.outcaps = caps + + self.filesrc = gst.element_factory_make("filesrc") + self.filesrc.set_property("location", self.filename) + self.dbin = gst.element_factory_make("decodebin") + self.audioconvert = gst.element_factory_make("audioconvert") + self.audioresample = gst.element_factory_make("audioresample") + self.volume = gst.element_factory_make("volume") + + self.add(self.filesrc, self.dbin, + self.audioconvert, self.audioresample, self.volume) + self.filesrc.link(self.dbin) + self.audioconvert.link(self.audioresample) + self.audioresample.link(self.volume, caps) + + self.dbin.connect("new-decoded-pad", self._new_decoded_pad_cb) + self.dbin.connect("unknown-type", self._unknown_type_cb) + + self._srcpad = None + + def __repr__(self): + return "" % self.filename + + def set_volume(self, volume): + gst.debug("setting volume to %f" % volume) + self.volume.set_property("volume", volume) + + def _new_decoded_pad_cb(self, dbin, pad, is_last): + gst.debug("new decoded pad: pad %r [%s]" % (pad, pad.get_caps().to_string())) + if not "audio" in pad.get_caps().to_string() or self._srcpad: + return + + gst.debug("linking pad %r to audioconvert" % pad) + pad.link(self.audioconvert.get_pad("sink")) + + self._srcpad = gst.GhostPad("src", self.volume.get_pad("src")) + self.add_pad(self._srcpad) + + def _unknown_type_cb(self, pad, caps): + self.emit('done', UNKNOWN_TYPE) + + def stop(self): + self.set_state(gst.STATE_NULL) + + def clean(self): + self.stop() + self.remove(self.filesrc) + self.remove(self.dbin) + self.remove(self.audioconvert) + self.remove(self.audioresample) + self.remove(self.volume) + self.filesrc = None + self.dbin = None + self.audioconvert = None + self.volume = None + +gobject.type_register(AudioSource) + + +# run us to test +if __name__ == "__main__": + main = gobject.MainLoop() + + def _done_cb(source, reason): + print "Done" + sys.stdout.flush() + if reason != EOS: + print "Some error happened: %s" % reason + main.quit() + + def _error_cb(source, element, gerror, message): + print "Error: %s" % gerror + main.quit() + + try: + source = AudioSource(sys.argv[1]) + except IndexError: + sys.stderr.write("Please give a filename to play\n") + sys.exit(1) + + pipeline = gst.Pipeline("playing") + # connecting on the source also catches eos emit when + # no audio pad + source.connect('done', _done_cb) + pipeline.connect('error', _error_cb) + + p = "osssink" + if len(sys.argv) > 2: + p = " ".join(sys.argv[2:]) + + pipeline.add(source) + sink = gst.parse_launch(p) + pipeline.add(sink) + source.link(sink) + + # we schedule this as a timeout so that we are definately in the main + # loop when it goes to PLAYING, and errors make main.quit() work correctly + def _start(pipeline): + print "setting pipeline to PLAYING" + pipeline.set_state(gst.STATE_PLAYING) + print "set pipeline to PLAYING" + + gobject.timeout_add(0, _start, pipeline) + gobject.idle_add(pipeline.iterate) + + print "Going into main loop" + main.run() + print "Left main loop" + + pipeline.set_state(gst.STATE_NULL) + pipeline.remove(source) + pipeline.remove(sink) + utils.gc_collect('cleaned out pipeline') + source.clean() + utils.gc_collect('cleaned up source') + source = None + utils.gc_collect('set source to None')