Source code for zounds.learn.random_samples

from featureflow import Node, NotEnoughData
from zounds.core import ArrayWithUnits, IdentityDimension
import numpy as np
from multiprocessing.pool import ThreadPool
from os import cpu_count


class Reservoir(object):
    def __init__(self, nsamples, dtype=None):
        super(Reservoir, self).__init__()

        if not isinstance(nsamples, int):
            raise ValueError('nsamples must be an integer')

        if nsamples <= 0:
            raise ValueError('nsamples must be greater than zero')

        self.nsamples = nsamples
        self.arr = None
        self.indices = set()
        self.dtype = dtype

    def __len__(self):
        return len(self.indices)

    def percent_full(self):
        return float(len(self)) / self.nsamples

    def _init_arr(self, samples):
        if self.arr is not None:
            return

        shape = (self.nsamples,) + samples.shape[1:]
        self.arr = np.zeros(shape, dtype=self.dtype or samples.dtype)

        try:
            self.arr = ArrayWithUnits(
                self.arr, (IdentityDimension(),) + samples.dimensions[1:])
        except AttributeError:
            pass

    def add(self, samples, indices=None):
        self._init_arr(samples)

        if indices is None:
            indices = np.random.randint(0, self.nsamples, len(samples))

        if len(indices) != len(samples):
            raise ValueError(
                'number of input samples and indices must match'
                ' but they were {samples} and {indices} respectively'
                    .format(samples=len(samples), indices=len(indices)))

        self.arr[indices, ...] = samples
        self.indices.update(indices)

    def get(self):
        if len(self.indices) == self.nsamples:
            return self.arr

        x = self.arr[sorted(self.indices), ...]
        return x

    def get_batch(self, batch_size):
        if batch_size > self.nsamples:
            raise ValueError(
                'Requested {batch_size} samples, but this instance can provide '
                'at maximum {nsamples}'
                    .format(batch_size=batch_size, nsamples=self.nsamples))

        if batch_size > len(self.indices):
            raise ValueError(
                'Requested {batch_size} samples, but this instance only '
                'currently has {n} samples, with a maximum of {nsamples}'
                    .format(
                    batch_size=batch_size,
                    n=len(self.indices),
                    nsamples=self.nsamples))

        # TODO: this would be much more efficient for repeated calls if I
        # instead maintained a sorted set
        indices = np.random.choice(list(self.indices), batch_size)
        return self.arr[indices, ...]


class MultiplexedReservoir(object):
    def __init__(self, nsamples, dtype=None):
        super(MultiplexedReservoir, self).__init__()
        self.dtype = dtype
        self.reservoir = None
        self.nsamples = nsamples

    def _init_dict(self, samples):
        if self.reservoir is not None:
            return

        if self.reservoir is None:
            self.reservoir = dict(
                (k, Reservoir(self.nsamples, dtype=self.dtype))
                for k in samples.keys())

    def _check_sample_keys(self, samples):
        if set(self.reservoir.keys()) != set(samples.keys()):
            raise ValueError(
                'samples should have keys {keys}'
                    .format(keys=list(self.reservoir.keys())))

    def add(self, samples):
        self._init_dict(samples)
        self._check_sample_keys(samples)

        indices = None
        for k, v in samples.items():
            if indices is None:
                indices = np.random.randint(0, self.nsamples, len(v))

            self.reservoir[k].add(v, indices=indices)

    def get(self):
        return dict((k, v.get()) for k, v in self.reservoir.items())


[docs]class ShuffledSamples(Node): def __init__( self, nsamples=None, multiplexed=False, dtype=None, needs=None): super(ShuffledSamples, self).__init__(needs=needs) self.reservoir = MultiplexedReservoir(nsamples, dtype=dtype) \ if multiplexed else Reservoir(nsamples, dtype=dtype) def _enqueue(self, data, pusher): self.reservoir.add(data) def _dequeue(self): if not self._finalized: raise NotEnoughData() return self.reservoir.get()
class InfiniteSampler(Node): def __init__( self, nsamples=None, multiplexed=False, dtype=None, needs=None, feature_filter=lambda x: x, parallel=True): super(InfiniteSampler, self).__init__(needs=needs) self.parallel = parallel self.feature_filter = feature_filter self.multiplexed = multiplexed self.reservoir = MultiplexedReservoir(nsamples, dtype=dtype) \ if multiplexed else Reservoir(nsamples, dtype=dtype) def _total_samples(self, cls, feature, _ids): pool = ThreadPool(cpu_count()) feature_filter = self.feature_filter def x(_id): f = feature(_id=_id, persistence=cls) filtered = feature_filter(f) return len(filtered) if self.parallel: total_samples = sum(pool.imap_unordered(x, _ids)) else: total_samples = sum(map(x, _ids)) return total_samples def _update_reservoir(self, _id, cls, feature, total_samples): # fetch the features from a single document x = feature(_id=_id, persistence=cls) x = self.feature_filter(x) # compute the contribution this sample makes to the dataset at # large feature_size = len(x) ratio = float(feature_size) / total_samples # determine the appropriate number of samples to contribute to # the reservoir nsamples = max(1, int(self.reservoir.nsamples * ratio)) print('Contributing', feature_size, ratio, nsamples) # select an appropriately-sized and random subset of the feature. # this will be shuffled again as it is added to the reservoir, # but this ensures that samples are drawn evenly from the # duration of the sound indices = np.random.randint(0, feature_size, nsamples) self.reservoir.add(x[indices, ...]) return len(indices) def _process(self, data): cls, feature = data # compute the total number of samples in our dataset _ids = list(cls.database.iter_ids()) total_samples = self._total_samples(cls, feature, _ids) print('Total samples', total_samples) while True: if self.parallel: pool = ThreadPool(cpu_count()) list(pool.imap_unordered( lambda _id: self._update_reservoir( _id, cls, feature, total_samples), _ids)) else: for _id in _ids: self._update_reservoir(_id, cls, feature, total_samples) yield self.reservoir.get() class ReservoirSampler(Node): """ Use reservoir sampling (http://en.wikipedia.org/wiki/Reservoir_sampling) to draw a fixed-size set of random samples from a stream of unknown size. This is useful when the samples can fit into memory, but the stream cannot. """ def __init__(self, nsamples=None, wrapper=None, needs=None): super(ReservoirSampler, self).__init__(needs=needs) if wrapper: raise DeprecationWarning('wrapper is no longer used or needed') self._nsamples = int(nsamples) self._r = None self._index = 0 # TODO: What happens if we have filled up all the sample slots and we run # out of data? def _enqueue(self, data, pusher): if self._r is None: shape = (self._nsamples,) + data.shape[1:] self._r = np.zeros(shape, dtype=data.dtype) try: self._r = ArrayWithUnits( self._r, (IdentityDimension(),) + data.dimensions[1:]) except AttributeError: # samples were likely a plain numpy array, and not an # ArrayWithUnits instance pass diff = 0 if self._index < self._nsamples: diff = self._nsamples - self._index available = len(data[:diff]) self._r[self._index: self._index + available] = data[:diff] self._index += available remaining = len(data[diff:]) if not remaining: return indices = np.random.random_integers(0, self._index, size=remaining) indices = indices[indices < self._nsamples] self._r[indices, ...] = data[diff:][list(range(len(indices)))] self._index += remaining def _dequeue(self): if not self._finalized: raise NotEnoughData() if self._index <= self._nsamples: arr = np.asarray(self._r[:self._index]) np.random.shuffle(arr) if isinstance(self._r, ArrayWithUnits): arr = ArrayWithUnits(arr, self._r.dimensions) return arr return self._r