Source code for zounds.spectral.frequencyadaptive



import numpy as np
from scipy.signal import resample

from .tfrepresentation import ExplicitFrequencyDimension, FrequencyDimension
from zounds.core import ArrayWithUnits
from zounds.timeseries import ConstantRateTimeSeries
from zounds.timeseries import Picoseconds, TimeDimension


[docs]class FrequencyAdaptive(ArrayWithUnits): """ TODO: This needs some love. Mutually exclusive constructor arguments are no bueno Args: arrs: TODO time_dimension (TimeDimension): the time dimension of the first axis of this array scale (FrequencyScale): The frequency scale corresponding to the first axis of this array, mutually exclusive with the :code:`explicit_freq_dimension` argument explicit_freq_dimension (ExplicitFrequencyDimension): TODO See Also: :class:`~zounds.spectral.FrequencyAdaptiveTransform` """ def __new__( cls, arrs, time_dimension=None, scale=None, explicit_freq_dimension=None): if not time_dimension: raise ValueError('time_dimension is required') if explicit_freq_dimension: if scale: raise ValueError( 'scale must be None when explicit_freq_dimension is supplied') if not isinstance(arrs, np.ndarray): raise ValueError( 'arrs must be a contiguous array when explicit_freq_dimension_is_supplied') return ArrayWithUnits.__new__( cls, arrs, [time_dimension, explicit_freq_dimension]) stops = list(np.cumsum([arr.shape[1] for arr in arrs])) slices = [slice(start, stop) for (start, stop) in zip([0] + stops, stops)] dimensions = [time_dimension, ExplicitFrequencyDimension(scale, slices)] array = np.concatenate(arrs, axis=1) return ArrayWithUnits.__new__(cls, array, dimensions) def kwargs(self): return dict( time_dimension=self.time_dimension, explicit_freq_dimension=self.frequency_dimension) @property def scale(self): return self.frequency_dimension.scale @property def time_dimension(self): return self.dimensions[0] @property def frequency_dimension(self): return self.dimensions[1] @property def n_bands(self): return len(self.scale) def rasterize(self, n_coeffs): return self.square(n_coeffs) def _resample(self, band, n_coeffs, epsilon=1e-8): rs = resample(band, n_coeffs, axis=1) # resample doesn't necessarily maintain the correct scale/magnitude, as # it isn't using the norm="ortho" argument when calling fft, so ensure # that the original scale/magnitude is maintained after calling # resample band_max = band.max(axis=-1, keepdims=True) rs_max = rs.max(axis=-1, keepdims=True) ratio = rs_max / (band_max + epsilon) normalized = rs / (ratio + epsilon) return np.asarray(normalized.flatten())
[docs] def square(self, n_coeffs, do_overlap_add=False): """ Compute a "square" view of the frequency adaptive transform, by resampling each frequency band such that they all contain the same number of samples, and performing an overlap-add procedure in the case where the sample frequency and duration differ :param n_coeffs: The common size to which each frequency band should be resampled """ resampled_bands = [ self._resample(band, n_coeffs) for band in self.iter_bands()] stacked = np.vstack(resampled_bands).T fdim = FrequencyDimension(self.scale) # TODO: This feels like it could be wrapped up nicely elsewhere chunk_frequency = Picoseconds(int(np.round( self.time_dimension.duration / Picoseconds(1) / n_coeffs))) td = TimeDimension(frequency=chunk_frequency) arr = ConstantRateTimeSeries(ArrayWithUnits( stacked.reshape(-1, n_coeffs, self.n_bands), dimensions=[self.time_dimension, td, fdim])) if not do_overlap_add: return arr # Begin the overlap add procedure overlap_ratio = self.time_dimension.overlap_ratio if overlap_ratio == 0: # no overlap add is necessary return ArrayWithUnits(stacked, [td, fdim]) step_size_samples = int(n_coeffs * overlap_ratio) first_dim = int(np.round( (stacked.shape[0] * overlap_ratio) + (n_coeffs * overlap_ratio))) output = ArrayWithUnits( np.zeros((first_dim, self.n_bands)), dimensions=[td, fdim]) for i, chunk in enumerate(arr): start = step_size_samples * i stop = start + n_coeffs output[start: stop] += chunk.reshape((-1, self.n_bands)) return output
def iter_bands(self): return (self[:, band] for band in self.scale) def like_dims(self, arr): return self.__class__( arr, time_dimension=self.time_dimension, explicit_freq_dimension=self.frequency_dimension) @classmethod def from_array_with_units(cls, arr): fdim = arr.dimensions[1] arrs = [arr[:, band] for band in fdim.scale] fa = FrequencyAdaptive(arrs, arr.dimensions[0], fdim.scale) return fa