Source code for zounds.learn.preprocess

from featureflow import Node, NotEnoughData
import marshal
import types
from collections import OrderedDict
import hashlib
import numpy as np
from .functional import hyperplanes
from zounds.loudness import log_modulus, inverse_log_modulus


class Op(object):
    def __init__(self, func, **kwargs):
        super(Op, self).__init__()
        self._kwargs = kwargs
        try:
            self._func = marshal.dumps(func.__code__)
        except AttributeError:
            # func is already a marshalled function
            self._func = func
        self._version = self._compute_version()

    @property
    def version(self):
        return self._version

    @property
    def kwargs(self):
        return self._kwargs

    def _compute_version(self):
        h = hashlib.md5(self._func)
        for v in self._kwargs.values():
            try:
                h.update(v.version)
            except AttributeError:
                h.update(np.array(v))
        return h.hexdigest()

    def __getattr__(self, key):
        if key == '_kwargs':
            raise AttributeError()

        try:
            return self._kwargs[key]
        except KeyError:
            raise AttributeError(key)

    def __call__(self, arg, **kwargs):
        code = marshal.loads(self._func)
        f = types.FunctionType(
            code,
            globals(),
            'preprocess')
        kwargs.update(self._kwargs)
        return f(arg, **kwargs)


[docs]class PreprocessResult(object): """ `PreprocessResult` are the output of :class:`Preprocessor` nodes, and can participate in a `Pipeline`. Args: data: the data on which the node in the graph was originally trained op (Op): a callable that can transform data inversion_data: data extracted in the forward pass of the model, that can be used to invert the result inverse (Op): a callable that given the output of `op`, and `inversion_data`, can invert the result """ def __init__(self, data, op, inversion_data=None, inverse=None, name=None): super(PreprocessResult, self).__init__() self.name = name self.inverse = inverse self.inversion_data = inversion_data self.data = data self.op = op def __str__(self): return 'PreprocessResult(name={name})'.format(**self.__dict__) def __repr__(self): return self.__str__() def __getattr__(self, key): if key == 'op': raise AttributeError() return getattr(self.op, key) def for_storage(self): return PreprocessResult( None, self.op, self.inversion_data, self.inverse, self.name)
[docs]class Preprocessor(Node): """ `Preprocessor` is the common base class for nodes in a processing graph that will produce :class:`PreprocessingResult` instances that end up as part of a :class:`Pipeline`. Args: needs (Node): previous processing node(s) on which this one depends for its data See Also: :class:`PreprocessResult` :class:`PreprocessingPipeline` :class:`PipelineResult` """ def __init__(self, needs=None): super(Preprocessor, self).__init__(needs=needs) def _forward_func(self): """ Return a function that represents this processor's forward transform """ def x(data, example_arg=None, another_one=None): return data return x def transform(self, **kwargs): return Op(self._forward_func(), **kwargs) def _inversion_data(self): """ Return a function that computes any data needed for an inverse transform, if one is possible. Otherwise, raise NotImplemented """ def x(data, **kwargs): return kwargs return x def inversion_data(self, **kwargs): return Op(self._inversion_data(), **kwargs) def _backward_func(self): """ Return a function that computes this processor's inverse transform, if one is possible. Otherwise, raise NotImplemented """ def x(data, **inversion_args): return data return x def inverse_transform(self): return Op(self._backward_func()) def _extract_data(self, data): if isinstance(data, PreprocessResult): return data.data elif isinstance(data, dict): return dict((k, self._extract_data(v)) for k, v in data.items()) else: return data
[docs]class UnitNorm(Preprocessor): def __init__(self, needs=None): super(UnitNorm, self).__init__(needs=needs) def _forward_func(self): def x(d): from zounds.core import ArrayWithUnits from .functional import example_wise_unit_norm normed, norms = example_wise_unit_norm(d, return_norms=True) try: normed = ArrayWithUnits(normed, d.dimensions) except AttributeError: pass return ForwardResult(normed, norm=norms) return x def _backward_func(self): def x(d, norm=None): dim_diff = d.ndim - norm.ndim new_dims = (1,) * dim_diff return d * norm.reshape(norm.shape + new_dims) return x def _process(self, data): data = self._extract_data(data) op = self.transform() inv_data = self.inversion_data() inv = self.inverse_transform() try: forward_result = op(data) data = forward_result.output_value except AttributeError: data = None yield PreprocessResult( data, op, inversion_data=inv_data, inverse=inv, name='UnitNorm')
class BasePreprocessor(Preprocessor): def __init__(self, forward=None, backward=None, name=None, needs=None): super(BasePreprocessor, self).__init__(needs=needs) def not_implemented(): raise NotImplementedError() self.forward = forward self.backward = backward or not_implemented self.name = name def _forward_func(self): return self.forward def _backward_func(self): return self.backward def _process(self, data): data = self._extract_data(data) op = self.transform() inv_data = self.inversion_data() inv = self.inverse_transform() data = op(data) yield PreprocessResult( data, op, inversion_data=inv_data, inverse=inv, name=self.name) # TODO: what about functions with imports that aren't in the calling namespace? # TODO: what about functions that require inversion data? # TODO: what about functions that need to do some processing first (e.g. SimHash) class Log(BasePreprocessor): def __init__(self, needs=None): super(Log, self).__init__( forward=log_modulus, backward=inverse_log_modulus, name='Log', needs=needs)
[docs]class MuLawCompressed(Preprocessor): def __init__(self, needs=None): super(MuLawCompressed, self).__init__(needs=needs) def _forward_func(self): def x(d): from zounds.loudness import mu_law return mu_law(d) return x def _backward_func(self): def x(d): from zounds.loudness import inverse_mu_law return inverse_mu_law(d) return x def _process(self, data): data = self._extract_data(data) op = self.transform() inv_data = self.inversion_data() inv = self.inverse_transform() data = op(data) yield PreprocessResult( data, op, inversion_data=inv_data, inverse=inv, name='MuLawCompressed')
class Slicer(Preprocessor): def __init__(self, slicex=None, needs=None): super(Slicer, self).__init__(needs=needs) self.fill_func = np.zeros self.slicex = slicex def _forward_func(self): def x(d, slicex=None): return d[..., slicex] return x def _inversion_data(self): def y(d, slicex=None, fill_func=None): try: ka = d.kwargs() def ff(shape, dtype): return d.__class__(fill_func(shape, dtype), **ka) except AttributeError: def ff(shape, dtype): return fill_func(shape, dtype) return dict(shape=d.shape, slicex=slicex, fill_func=ff) return y def _backward_func(self): def z(d, shape=None, fill_func=None, slicex=None): new_shape = d.shape[:1] + shape[1:] new_arr = fill_func(new_shape, d.dtype) new_arr[..., slicex] = d return new_arr return z def _process(self, data): data = self._extract_data(data) op = self.transform(slicex=self.slicex) inv_data = self.inversion_data( fill_func=self.fill_func, slicex=self.slicex) inv = self.inverse_transform() data = op(data) yield PreprocessResult( data, op, inversion_data=inv_data, inverse=inv, name='Slicer') class Reshape(Preprocessor): def __init__(self, new_shape, needs=None): super(Reshape, self).__init__(needs=needs) self.new_shape = new_shape def _forward_func(self): def x(d, new_shape=None): return d.reshape(d.shape[:1] + new_shape) return x def _inversion_data(self): def x(d): return dict(original_shape=d.shape) return x def _backward_func(self): def x(d, original_shape=None): return d.reshape(original_shape) return x def _process(self, data): data = self._extract_data(data) op = self.transform(new_shape=self.new_shape) inv_data = self.inversion_data() inv = self.inverse_transform() data = op(data) yield PreprocessResult( data, op, inversion_data=inv_data, inverse=inv, name='Reshape')
[docs]class SimHash(Preprocessor): """ Hash feature vectors by computing on which side of N hyperplanes those features lie. Args: bits (int): The number of hyperplanes, and hence, the number of bits in the resulting hash packbits (bool): Should the result be bit-packed? needs (Preprocessor): the processing node on which this node relies for its data """ def __init__(self, bits=None, packbits=False, needs=None): super(SimHash, self).__init__(needs=needs) self.packbits = packbits self.bits = bits def _forward_func(self): def x(d, plane_vectors=None, packbits=None): from zounds.core import ArrayWithUnits, IdentityDimension from zounds.learn import simhash import numpy as np bits = simhash(plane_vectors, d) if packbits: bits = np.packbits(bits, axis=-1).view(np.uint64) try: return ArrayWithUnits( bits, [d.dimensions[0], IdentityDimension()]) except AttributeError: return bits return x def _backward_func(self): def x(d): raise NotImplementedError() return x def _process(self, data): data = self._extract_data(data) mean = data.mean(axis=0).flatten() std = data.std(axis=0).flatten() plane_vectors = hyperplanes(mean, std, self.bits) op = self.transform(plane_vectors=plane_vectors, packbits=self.packbits) inv_data = self.inversion_data() inv = self.inverse_transform() data = op(data) yield PreprocessResult( data, op, inversion_data=inv_data, inverse=inv, name='SimHash')
[docs]class MeanStdNormalization(Preprocessor): def __init__(self, needs=None): super(MeanStdNormalization, self).__init__(needs=needs) def _forward_func(self): def x(d, mean=None, std=None): import numpy as np return np.divide(d - mean, std, where=std != 0) return x def _backward_func(self): def x(d, mean=None, std=None): arr = d.copy() arr *= std arr += mean return arr return x def _process(self, data): data = self._extract_data(data) mean = data.mean(axis=0) std = data.std(axis=0) op = self.transform(mean=mean, std=std) inv_data = self.inversion_data(mean=mean, std=std) inv = self.inverse_transform() data = op(data) yield PreprocessResult( data, op, inversion_data=inv_data, inverse=inv, name='MeanStd')
class ForwardResult(object): def __init__(self, output_value, **inversion_data): self.output_value = output_value self.inversion_data = inversion_data
[docs]class InstanceScaling(Preprocessor): def __init__(self, max_value=1, needs=None): super(InstanceScaling, self).__init__(needs=needs) self.max_value = max_value def _forward_func(self): def x(d, max_value=None): import numpy as np axes = tuple(range(1, len(d.shape))) # TODO: it should be possible to hand this intermediate value off # to inversion data, and to ignore the extra m = np.max(np.abs(d), axis=axes, keepdims=True) output = max_value * np.divide(d, m, where=m != 0) return ForwardResult(output, max=m, max_value=max_value) return x def _backward_func(self): def x(d, max=None, max_value=None): return (d * max) / max_value return x def _process(self, data): data = self._extract_data(data) op = self.transform(max_value=self.max_value) inv_data = self.inversion_data() inv = self.inverse_transform() forward_result = op(data) data = forward_result.output_value yield PreprocessResult( data, op, inversion_data=inv_data, inverse=inv, name='InstanceScaling')
class Multiply(Preprocessor): def __init__(self, factor=1, needs=None): super(Multiply, self).__init__(needs=needs) self.factor = factor def _forward_func(self): def x(d, factor=None): return d * factor return x def _backward_func(self): def x(d, factor=None): return d * (1.0 / factor) return x def _process(self, data): data = self._extract_data(data) op = self.transform(factor=self.factor) inv_data = self.inversion_data(factor=self.factor) inv = self.inverse_transform() data = op(data) yield PreprocessResult( data, op, inversion_data=inv_data, inverse=inv, name='Multiply')
[docs]class Weighted(Preprocessor): def __init__(self, weighting, needs=None): super(Weighted, self).__init__(needs=needs) self.weighting = weighting def _forward_func(self): def x(d, weighting=None): return d * weighting return x def _backward_func(self): def x(d, weighting=None): return d / weighting return x def _process(self, data): data = self._extract_data(data) op = self.transform(weighting=self.weighting) inv_data = self.inversion_data(weighting=self.weighting) inv = self.inverse_transform() data = op(data) yield PreprocessResult( data, op, inversion_data=inv_data, inverse=inv, name='Weighted')
class AbsoluteValue(Preprocessor): def __init__(self, needs=None): super(AbsoluteValue, self).__init__(needs=needs) def _forward_func(self): def x(d): import numpy as np from zounds.core import ArrayWithUnits processed = np.abs(d) try: return ArrayWithUnits(processed, d.dimensions) except AttributeError: return processed return x def _backward_func(self): def x(d): raise NotImplementedError() return x def _process(self, data): data = self._extract_data(data) op = self.transform() inv_data = self.inversion_data() inv = self.inverse_transform() try: data = op(data) except: data = None yield PreprocessResult( data, op, inversion_data=inv_data, inverse=inv, name='AbsoluteValue') class Sharpen(Preprocessor): def __init__(self, kernel=None, needs=None): super(Sharpen, self).__init__(needs=needs) self.kernel = kernel def _forward_func(self): def x(d, kernel=None): from scipy.signal import convolve from zounds.core import ArrayWithUnits data = convolve(d, kernel[None, ...], mode='same') data[data < 0] = 0 try: return ArrayWithUnits(data, d.dimensions) except AttributeError: return data return x def _backward_func(self): def x(d): raise NotImplementedError() return x def _process(self, data): data = self._extract_data(data) op = self.transform(kernel=self.kernel) inv_data = self.inversion_data() inv = self.inverse_transform() try: data = op(data) except: data = None yield PreprocessResult( data, op, inversion_data=inv_data, inverse=inv, name='Sharpen') class Binarize(Preprocessor): def __init__(self, threshold=0.5, needs=None): super(Binarize, self).__init__(needs=needs) self.threshold = threshold def _forward_func(self): def x(d, threshold=None): import numpy as np from zounds.core import ArrayWithUnits data = np.zeros(d.shape, dtype=np.uint8) data[np.where(d > threshold)] = 1 try: return ArrayWithUnits(data, d.dimensions) except AttributeError: return data return x def _backward_func(self): def x(d): raise NotImplementedError() return x def _process(self, data): data = self._extract_data(data) op = self.transform(threshold=self.threshold) inv_data = self.inversion_data() inv = self.inverse_transform() try: data = op(data) except: data = None yield PreprocessResult( data, op, inversion_data=inv_data, inverse=inv, name='Binarize')
[docs]class Pipeline(object): """ """ def __init__(self, preprocess_results): self.processors = list(preprocess_results) versions = ''.join([p.op.version for p in self.processors]) self.version = hashlib.md5(versions.encode()).hexdigest() def __getitem__(self, index): if isinstance(index, int): return self.processors[index] elif isinstance(index, list): return Pipeline([self.processors[i] for i in index]) return Pipeline(self.processors[index]) def wrap_data(self, data): cls = data.__class__ try: kwargs = data.kwargs() except AttributeError: kwargs = None return cls, kwargs def transform(self, data, wrapper=None): inversion_data = [] wrap_data = [] for p in self.processors: wrap_data.append(self.wrap_data(data)) processed_data = p.op(data) if isinstance(processed_data, ForwardResult): inv_data = processed_data.inversion_data processed_data = processed_data.output_value else: inv_data = p.inversion_data(data) inversion_data.append(inv_data) data = processed_data if wrapper is not None: data = wrapper(data) return PipelineResult(data, self.processors, inversion_data, wrap_data)
[docs]class PipelineResult(object): def __init__(self, data, processors, inversion_data, wrap_data): super(PipelineResult, self).__init__() self.processors = processors[::-1] self.inversion_data = inversion_data[::-1] self.wrap_data = wrap_data[::-1] self.data = data def unwrap(self, data, wrap_data): cls, kwargs = wrap_data if kwargs is None: return data return cls(data, **kwargs) def inverse_transform(self): data = self.data for inv_data, wrap_data, p in \ zip(self.inversion_data, self.wrap_data, self.processors): data = p.inverse(data, **inv_data) data = self.unwrap(data, wrap_data) return data
[docs]class PreprocessingPipeline(Node): """ A `PreprocessingPipeline` is a node in the graph that can be connected to one or more :class:`Preprocessor` nodes, whose output it will assemble into a re-usable pipeline. Args: needs (list or tuple of Node): the :class:`Preprocessor` nodes on whose output this pipeline depends Here's an example of a learning pipeline that will first find the feature-wise mean and standard deviation of a dataset, and will then learn K-Means clusters from the dataset. This will result in a re-usable pipeline that can use statistics from the original dataset to normalize new examples, assign them to a cluster, and finally, reconstruct them. .. code:: python import featureflow as ff import zounds from random import choice samplerate = zounds.SR44100() STFT = zounds.stft(resample_to=samplerate) @zounds.simple_in_memory_settings class Sound(STFT): bark = zounds.ArrayWithUnitsFeature( zounds.BarkBands, samplerate=samplerate, needs=STFT.fft, store=True) @zounds.simple_in_memory_settings class ExamplePipeline(ff.BaseModel): docs = ff.PickleFeature( ff.IteratorNode, needs=None) shuffled = ff.PickleFeature( zounds.ShuffledSamples, nsamples=100, needs=docs, store=False) meanstd = ff.PickleFeature( zounds.MeanStdNormalization, needs=docs, store=False) kmeans = ff.PickleFeature( zounds.KMeans, needs=meanstd, centroids=32) pipeline = ff.PickleFeature( zounds.PreprocessingPipeline, needs=(meanstd, kmeans), store=True) # apply the Sound processing graph to individual audio files for metadata in zounds.InternetArchive('TheR.H.SFXLibrary'): print 'processing {url}'.format(url=metadata.request.url) Sound.process(meta=metadata) # apply the ExamplePipeline processing graph to the entire corpus of audio _id = ExamplePipeline.process(docs=(snd.bark for snd in Sound)) learned = ExamplePipeline(_id) snd = choice(list(Sound)) result = learned.pipeline.transform(snd.bark) print result.data # print the assigned centroids for each FFT frame inverted = result.inverse_transform() print inverted # the reconstructed FFT frames See Also: :class:`Pipeline` :class:`Preprocessor` :class:`PreprocessResult` :class:`PipelineResult` """ def __init__(self, needs=None): super(PreprocessingPipeline, self).__init__(needs=needs) self._init_pipeline() def _init_pipeline(self): self._pipeline = OrderedDict((id(n), None) for n in list(self.needs.values())) def _enqueue(self, data, pusher): self._pipeline[id(pusher)] = data def _dequeue(self): if not all(self._pipeline.values()): raise NotEnoughData() pipeline = Pipeline([x.for_storage() for x in iter(self._pipeline.values())]) self._init_pipeline() return pipeline