import numpy as np
from zounds.nputil import sliding_window, windowed
from zounds.util import tuplify
from .dimensions import IdentityDimension
class CustomSlice(object):
def __init__(self):
super(CustomSlice, self).__init__()
[docs]class ArrayWithUnits(np.ndarray):
"""
`ArrayWithUnits` is an :class:`numpy.ndarray` subclass that allows for
indexing by more semantically meaningful slices.
It supports most methods on :class:`numpy.ndarray`, and makes a best-effort
to maintain meaningful dimensions throughout those operations.
Args:
arr (ndarray): The :class:`numpy.ndarray` instance containing the raw
data for this instance
dimensions (list or tuple): list or tuple of :class:`Dimension`-derived
classes
Raises:
ValueError: when `arr.ndim` and `len(dimensions)` do not match
Examples:
>>> from zounds import ArrayWithUnits, TimeDimension, Seconds, TimeSlice
>>> import numpy as np
>>> data = np.zeros(100)
>>> awu = ArrayWithUnits(data, [TimeDimension(Seconds(1))])
>>> sliced = awu[TimeSlice(Seconds(10))]
>>> sliced.shape
(10,)
See Also:
:class:`IdentityDimension`
:class:`~zounds.timeseries.TimeDimension`
:class:`~zounds.spectral.FrequencyDimension`
"""
def __new__(cls, arr, dimensions):
if arr.ndim != len(dimensions):
raise ValueError(
'arr.ndim and len(dimensions) must match. '
'They were {arr.shape} and {dimensions}'.format(**locals()))
obj = np.asarray(arr).view(cls)
obj.dimensions = tuple([IdentityDimension() if d is None else d.copy() for d in dimensions])
for dim, size in zip(obj.dimensions, obj.shape):
try:
dim.size = size
except AttributeError:
pass
try:
dim.validate(size)
except AttributeError:
pass
return obj
@property
def T(self):
arr = super(ArrayWithUnits, self).T
return ArrayWithUnits(arr, self.dimensions[::-1])
def kwargs(self):
return self.__dict__
def concatenate(self, other):
if self.dimensions == other.dimensions:
return self.from_example(np.concatenate([self, other]), self)
else:
raise ValueError('All dimensions must match to concatenate')
def reshape(self, shape, order='C'):
non_one = lambda x: abs(x) != 1
if tuple(shape) == tuple(filter(non_one, self.shape)):
# the new shape is this array's shape will all ones removed
return self.squeeze()
elif tuple(self.shape) == tuple(filter(non_one, shape)):
# the new shape just adds some single dimension axes
dims = iter(self.dimensions)
new_dims = [
IdentityDimension() if abs(size) == 1 else next(dims)
for size in shape
]
raw = np.asarray(self)
return ArrayWithUnits(raw.reshape(shape), new_dims)
# treat every axis as an IdentityDimension
raw = np.asarray(self)
return ArrayWithUnits(
np.reshape(raw, shape, order),
[IdentityDimension() for _ in shape])
def squeeze(self):
zipped = [x for x in zip(self.shape, self.dimensions) if x[0] > 1]
return ArrayWithUnits(
super().reshape([s for s, _ in zipped]),
[d for _, d in zipped])
@classmethod
def concat(cls, arrs, axis=0):
for arr in arrs[1:]:
if arr.dimensions != arrs[0].dimensions:
raise ValueError('All dimensions must match')
return cls.from_example(np.concatenate(arrs, axis=axis), arrs[0])
[docs] @classmethod
def from_example(cls, data, example):
"""
Produce a new :class:`ArrayWithUnits` instance given some raw data and
an example instance that has the desired dimensions
"""
return ArrayWithUnits(data, example.dimensions)
@classmethod
def zeros(cls, example):
return cls.from_example(
np.zeros(example.shape, dtype=example.dtype), example)
def zeros_like(self):
return self.__class__.zeros(self)
def _apply_reduction_to_dimensions(self, result, axis, keepdims):
if axis is None:
return result
ndims = len(self.dimensions)
reduced_axes = set([ndims + a if a < 0 else a for a in tuplify(axis)])
all_axes = set(range(ndims))
if keepdims:
new_dims = [
IdentityDimension() if i in reduced_axes else dim
for i, dim in enumerate(self.dimensions)]
else:
remaining_axes = sorted(all_axes - reduced_axes)
new_dims = [self.dimensions[i] for i in remaining_axes]
return ArrayWithUnits(result, new_dims)
def sum(self, axis=None, dtype=None, keepdims=False, **kwargs):
result = super(ArrayWithUnits, self).sum(
axis, dtype, keepdims=keepdims, **kwargs)
return self._apply_reduction_to_dimensions(result, axis, keepdims)
def max(self, axis=None, out=None, keepdims=False):
result = super(ArrayWithUnits, self).max(
axis=axis, out=out, keepdims=keepdims)
return self._apply_reduction_to_dimensions(result, axis, keepdims)
def dot(self, b):
result = super(ArrayWithUnits, self).dot(b)
return self.__class__(result, self.dimensions[:result.ndim])
def packbits(self, axis=None):
arr = np.packbits(self, axis=axis)
return ArrayWithUnits(arr, self.dimensions)
def __array_finalize__(self, obj):
if obj is None:
return
self.dimensions = getattr(obj, 'dimensions', None)
def __array_wrap__(self, obj, context=None):
if len(self.dimensions) != obj.ndim:
if obj.ndim == 0:
return obj.item()
return np.asarray(obj)
return np.ndarray.__array_wrap__(self, obj, context)
def sliding_window(self, windowsize, stepsize=None):
ws = tuple(self._compute_span(windowsize))
ss = tuple(self._compute_span(stepsize)) if stepsize else ws
result = sliding_window(self, ws, ss)
try:
new_dims = tuple(self._compute_new_dims(result, ws, ss))
except ValueError:
new_dims = [IdentityDimension()] * result.ndim
return ArrayWithUnits(result, new_dims)
def _sliding_window_integer_slices(self, windowsize, stepsize=None):
windowsize = [windowsize] + [slice(None) for _ in self.dimensions[1:]]
ws = tuple(self._compute_span(windowsize))
if stepsize:
stepsize = [stepsize] + [slice(None) for _ in self.dimensions[1:]]
ss = tuple(self._compute_span(stepsize))
else:
ss = ws
return ws, ss
def sliding_window_with_leftovers(
self, windowsize, stepsize=None, dopad=False):
ws, ss = \
self._sliding_window_integer_slices(windowsize, stepsize)
leftovers, result = windowed(self, ws[0], ss[0], dopad)
if not result.size:
return self, ArrayWithUnits(result, self.dimensions)
try:
new_dims = tuple(self._compute_new_dims(result, ws, ss))
except ValueError:
new_dims = [IdentityDimension()] * result.ndim
return leftovers, ArrayWithUnits(result, new_dims)
def _compute_new_dims(self, windowed, ws, ss):
for dimension, size, w, s in zip(self.dimensions, self.shape, ws, ss):
try:
modified = dimension.modified_dimension(size, w, s)
for m in modified:
yield m
except NotImplementedError:
yield dimension
def _compute_span(self, index):
for sl in self._compute_indices(index):
try:
yield sl.stop - sl.start
except (AttributeError, TypeError):
yield sl
def _is_integer_based_slice(self, sl):
if not isinstance(sl, slice):
return False
try:
return \
(sl.start is None or sl.start.bit_length()) \
and (sl.stop is None or sl.stop.bit_length())
except AttributeError:
return False
def _compute_indices(self, index):
dims_pos = 0
for sl in index:
if sl is None \
or isinstance(sl, int) \
or self._is_integer_based_slice(sl) \
or isinstance(sl, list):
# burn one
dims_pos += 1
yield sl
elif sl is Ellipsis:
dims_pos += len(self.dimensions) - (len(index) - 1)
yield Ellipsis
else:
dim = self.dimensions[dims_pos]
yield dim.integer_based_slice(sl)
dims_pos += 1
def _new_dims(self, index, new_arr):
dims_pos = 0
shape_pos = 0
not_ellipsis_or_none = len([x for x in index if x is not Ellipsis and x is not None])
for sl in index:
if sl is None:
# additional dimension via np.newaxis
yield IdentityDimension()
elif isinstance(sl, int):
# burn a dimension
dims_pos += 1
elif isinstance(sl, list):
dims_pos += 1
shape_pos += 1
yield IdentityDimension()
elif sl is Ellipsis:
ellipsis_size = len(self.dimensions) - not_ellipsis_or_none
for i in range(ellipsis_size):
yield self.dimensions[dims_pos]
dims_pos += 1
shape_pos += 1
else:
try:
dim = self.dimensions[dims_pos]
shape = new_arr.shape[shape_pos]
yield dim.metaslice(sl, shape)
except IndexError:
yield dim
dims_pos += 1
shape_pos += 1
# Return any leftover dimensions
for dim in self.dimensions[dims_pos:]:
yield dim
def _tuplify(self, a):
if isinstance(a, list):
t = set([x.__class__ for x in a])
if len(t) > 1:
raise ValueError('a must be homogeneous')
t = list(t)[0]
if t == slice:
return a
else:
return a,
if isinstance(a, np.ndarray) and a.dtype == np.bool:
return a,
try:
return tuple(a)
except TypeError:
return a,
def __getslice__(self, i, j):
return self.__getitem__(slice(i, j))
def __setitem__(self, index, value):
index = self._tuplify(index)
indices = tuple(self._compute_indices(index))
super(ArrayWithUnits, self).__setitem__(indices, value)
def __getitem__(self, index):
if isinstance(index, np.ndarray) \
and index.dtype == np.bool:
return np.asarray(self)[index]
if self.ndim == 1 and isinstance(index, int):
return np.asarray(self)[index]
index = self._tuplify(index)
indices = tuple(self._compute_indices(index))
arr = super(ArrayWithUnits, self).__getitem__(indices)
new_dims = tuple(self._new_dims(index, arr))
return ArrayWithUnits(arr, new_dims)