from .frequencyscale import LinearScale, FrequencyBand, ExplicitScale
from .tfrepresentation import FrequencyDimension
from .frequencyadaptive import FrequencyAdaptive
from zounds.timeseries import \
audio_sample_rate, TimeSlice, Seconds, TimeDimension, HalfLapped, \
Milliseconds, SampleRate
from zounds.core import ArrayWithUnits, IdentityDimension
from .sliding_window import \
IdentityWindowingFunc, HanningWindowingFunc, WindowingFunc, \
OggVorbisWindowingFunc
from zounds.loudness import log_modulus, unit_scale
import numpy as np
from scipy.signal import resample, firwin2
from matplotlib import cm
from scipy.signal import hann, morlet
from itertools import repeat
from zounds.nputil import sliding_window
[docs]def fft(x, axis=-1, padding_samples=0):
"""
Apply an FFT along the given dimension, and with the specified amount of
zero-padding
Args:
x (ArrayWithUnits): an :class:`~zounds.core.ArrayWithUnits` instance
which has one or more :class:`~zounds.timeseries.TimeDimension`
axes
axis (int): The axis along which the fft should be applied
padding_samples (int): The number of padding zeros to apply along
axis before performing the FFT
"""
if padding_samples > 0:
padded = np.concatenate(
[x, np.zeros((len(x), padding_samples), dtype=x.dtype)],
axis=axis)
else:
padded = x
transformed = np.fft.rfft(padded, axis=axis, norm='ortho')
sr = audio_sample_rate(int(Seconds(1) / x.dimensions[axis].frequency))
scale = LinearScale.from_sample_rate(sr, transformed.shape[-1])
new_dimensions = list(x.dimensions)
new_dimensions[axis] = FrequencyDimension(scale)
return ArrayWithUnits(transformed, new_dimensions)
def stft(x, window_sample_rate=HalfLapped(), window=HanningWindowingFunc()):
duration = TimeSlice(window_sample_rate.duration)
frequency = TimeSlice(window_sample_rate.frequency)
if x.ndim == 1:
_, arr = x.sliding_window_with_leftovers(
duration, frequency, dopad=True)
elif x.ndim == 2 and isinstance(x.dimensions[0], IdentityDimension):
arr = x.sliding_window((1, duration), (1, frequency))
td = x.dimensions[-1]
dims = [IdentityDimension(), TimeDimension(*window_sample_rate), td]
arr = ArrayWithUnits(arr.reshape((len(x), -1, arr.shape[-1])), dims)
else:
raise ValueError(
'x must either have a single TimeDimension, or '
'(IdentityDimension, TimeDimension)')
window = window or IdentityWindowingFunc()
windowed = arr * window._wdata(arr.shape[-1])
return fft(windowed)
def mdct(data):
l = data.shape[-1] // 2
t = np.arange(0, 2 * l)
f = np.arange(0, l)
cpi = -1j * np.pi
a = data * np.exp(cpi * t / 2 / l)
b = np.fft.fft(a)
c = b[..., :l]
transformed = np.sqrt(2 / l) * np.real(
c * np.exp(cpi * (f + 0.5) * (l + 1) / 2 / l))
return transformed
def imdct(frames):
l = frames.shape[-1]
t = np.arange(0, 2 * l)
f = np.arange(0, l)
cpi = -1j * np.pi
a = frames * np.exp(cpi * (f + 0.5) * (l + 1) / 2 / l)
b = np.fft.fft(a, 2 * l)
return np.sqrt(2 / l) * np.real(b * np.exp(cpi * t / 2 / l))
def time_stretch(x, factor, frame_sample_rate=None):
if frame_sample_rate is None:
sr = HalfLapped()
sr = SampleRate(frequency=sr.frequency / 2, duration=sr.duration)
else:
sr = frame_sample_rate
hop_length, window_length = sr.discrete_samples(x)
win = WindowingFunc(windowing_func=hann)
# to simplify, let's always compute the stft in "batch" mode
if x.ndim == 1:
x = x.reshape((1,) + x.shape)
D = stft(x, sr, win)
n_fft_coeffs = D.shape[-1]
n_frames = D.shape[1]
n_batches = D.shape[0]
time_steps = np.arange(0, n_frames, factor, dtype=np.float)
weights = np.mod(time_steps, 1.0)
exp_phase_advance = np.linspace(0, np.pi * hop_length, n_fft_coeffs)
# pad in the time dimension, so no edge/end frames are left out
# coeffs = np.pad(D, [(0, 0), (0, 2), (0, 0)], mode='constant')
shape = list(D.shape)
shape[1] += 2
coeffs = np.zeros(shape, dtype=D.dtype)
coeffs[:, :-2, :] = D
coeffs_mags = np.abs(coeffs)
coeffs_phases = np.angle(coeffs)
# we need a phase accumulator for every item in the batch
phase_accum = coeffs_phases[:, :1, :]
sliding_indices = np.vstack([time_steps, time_steps + 1]).T.astype(np.int32)
windowed_mags = coeffs_mags[:, sliding_indices, :]
windowed_phases = coeffs_phases[:, sliding_indices, :]
first_mags = windowed_mags[:, :, 0, :]
second_mags = windowed_mags[:, :, 1, :]
first_phases = windowed_phases[:, :, 0, :]
second_phases = windowed_phases[:, :, 1, :]
# compute all the phase stuff
two_pi = 2.0 * np.pi
dphase = (second_phases - first_phases - exp_phase_advance)
dphase -= two_pi * np.round(dphase / two_pi)
dphase += exp_phase_advance
all_phases = np.concatenate([phase_accum, dphase], axis=1)
dphase = np.cumsum(all_phases, axis=1, out=all_phases)
dphase = dphase[:, :-1, :]
# linear interpolation of FFT coefficient magnitudes
weights = weights[None, :, None]
mags = ((1.0 - weights) * first_mags) + (weights * second_mags)
# combine magnitudes and phases
new_coeffs = mags * np.exp(1.j * dphase)
# synthesize the new frames
new_frames = np.fft.irfft(new_coeffs, axis=-1, norm='ortho')
# new_frames = new_frames * win._wdata(new_frames.shape[-1])
new_frames = np.multiply(
new_frames, win._wdata(new_frames.shape[-1]), out=new_frames)
# overlap add the new audio samples
new_n_samples = int(x.shape[-1] / factor)
output = np.zeros((n_batches, new_n_samples), dtype=x.dtype)
for i in range(new_frames.shape[1]):
start = i * hop_length
stop = start + new_frames.shape[-1]
l = output[:, start: stop].shape[1]
output[:, start: stop] += new_frames[:, i, :l]
return ArrayWithUnits(output, [IdentityDimension(), x.dimensions[-1]])
def pitch_shift(x, semitones, frame_sample_rate=None):
original_shape = x.shape[1] if x.ndim == 2 else x.shape[0]
# first, perform a time stretch so that the audio will have the desired
# pitch
factor = 2.0 ** (-float(semitones) / 12.0)
stretched = time_stretch(x, factor, frame_sample_rate=frame_sample_rate)
# hang on to original dimensions
dimensions = stretched.dimensions
# window the audio using a power-of-2 frame size for more efficient FFT
# computations
batch_size = stretched.shape[0]
window_size = 1024
step = (1, window_size)
new_window_shape = int(window_size * factor)
padding = window_size - int(stretched.shape[-1] % window_size)
stretched = np.pad(stretched, ((0, 0), (0, padding)), mode='constant')
windowed = sliding_window(stretched, step, step, flatten=False).squeeze()
# resample the audio so that it has the correct duration
rs = resample(windowed, new_window_shape, axis=-1)
# flatten out the windowed, resampled audio
rs = rs.reshape(batch_size, -1)
# slice the audio to remove residual zeros resulting from our power-of-2
# zero padding above
rs = rs[:, :original_shape]
return ArrayWithUnits(rs, dimensions)
def phase_shift(coeffs, samplerate, time_shift, axis=-1, frequency_band=None):
frequency_dim = coeffs.dimensions[axis]
if not isinstance(frequency_dim, FrequencyDimension):
raise ValueError(
'dimension {axis} of coeffs must be a FrequencyDimension instance, '
'but was {cls}'.format(axis=axis, cls=frequency_dim.__class__))
n_coeffs = coeffs.shape[axis]
shift_samples = int(time_shift / samplerate.frequency)
shift = (np.arange(0, n_coeffs) * 2j * np.pi) / n_coeffs
shift = np.exp(-shift * shift_samples)
shift = ArrayWithUnits(shift, [frequency_dim])
frequency_band = frequency_band or slice(None)
new_coeffs = coeffs.copy()
if coeffs.ndim == 1:
new_coeffs[frequency_band] *= shift[frequency_band]
return new_coeffs
slices = [slice(None) for _ in range(coeffs.ndim)]
slices[axis] = frequency_band
new_coeffs[tuple(slices)] *= shift[frequency_band]
return new_coeffs
def apply_scale(short_time_fft, scale, window=None):
magnitudes = np.abs(short_time_fft.real)
spectrogram = scale.apply(magnitudes, window)
dimensions = short_time_fft.dimensions[:-1] + (FrequencyDimension(scale),)
return ArrayWithUnits(spectrogram, dimensions)
def rainbowgram(time_frequency_repr, colormap=cm.rainbow):
# magnitudes on a log scale, and shifted and
# scaled to the unit interval
magnitudes = np.abs(time_frequency_repr.real)
magnitudes = log_modulus(magnitudes * 1000)
magnitudes = unit_scale(magnitudes)
angles = np.angle(time_frequency_repr)
angles = np.unwrap(angles, axis=0)
angles = np.gradient(angles)[0]
angles = unit_scale(angles)
colors = colormap(angles)
colors *= magnitudes[..., None]
# exclude the alpha channel, if there is one
colors = colors[..., :3]
arr = ArrayWithUnits(
colors, time_frequency_repr.dimensions + (IdentityDimension(),))
return arr
def fir_filter_bank(scale, taps, samplerate, window):
basis = np.zeros((len(scale), taps))
basis = ArrayWithUnits(basis, [
FrequencyDimension(scale),
TimeDimension(*samplerate)])
nyq = samplerate.nyquist
if window.ndim == 1:
window = repeat(window, len(scale))
for i, band, win in zip(range(len(scale)), scale, window):
start_hz = max(0, band.start_hz)
stop_hz = min(nyq, band.stop_hz)
freqs = np.linspace(
start_hz / nyq, stop_hz / nyq, len(win), endpoint=False)
freqs = [0] + list(freqs) + [1]
gains = [0] + list(win) + [0]
basis[i] = firwin2(taps, freqs, gains)
return basis
[docs]def morlet_filter_bank(
samplerate,
kernel_size,
scale,
scaling_factor,
normalize=True):
"""
Create a :class:`~zounds.core.ArrayWithUnits` instance with a
:class:`~zounds.timeseries.TimeDimension` and a
:class:`~zounds.spectral.FrequencyDimension` representing a bank of morlet
wavelets centered on the sub-bands of the scale.
Args:
samplerate (SampleRate): the samplerate of the input signal
kernel_size (int): the length in samples of each filter
scale (FrequencyScale): a scale whose center frequencies determine the
fundamental frequency of each filer
scaling_factor (int or list of int): Scaling factors for each band,
which determine the time-frequency resolution tradeoff.
The number(s) should fall between 0 and 1, with smaller numbers
achieving better frequency resolution, and larget numbers better
time resolution
normalize (bool): When true, ensure that each filter in the bank
has unit norm
See Also:
:class:`~zounds.spectral.FrequencyScale`
:class:`~zounds.timeseries.SampleRate`
"""
basis_size = len(scale)
basis = np.zeros((basis_size, kernel_size), dtype=np.complex128)
try:
if len(scaling_factor) != len(scale):
raise ValueError('scaling factor must have same length as scale')
except TypeError:
scaling_factor = np.repeat(float(scaling_factor), len(scale))
sr = int(samplerate)
for i, band in enumerate(scale):
scaling = scaling_factor[i]
w = band.center_frequency / (scaling * 2 * sr / kernel_size)
basis[i] = morlet(
M=kernel_size,
w=w,
s=scaling)
basis = basis.real
if normalize:
basis /= np.linalg.norm(basis, axis=-1, keepdims=True) + 1e-8
basis = ArrayWithUnits(
basis, [FrequencyDimension(scale), TimeDimension(*samplerate)])
return basis
def auto_correlogram(x, filter_bank, correlation_window=Milliseconds(30)):
n_filters = filter_bank.shape[0]
filter_size = filter_bank.shape[1]
corr_win_samples = int(correlation_window / x.samplerate.frequency)
windowed = sliding_window(x, filter_size, 1, flatten=False)
print(windowed.shape)
filtered = np.dot(windowed, filter_bank.T)
print(filtered.shape)
corr = sliding_window(
filtered,
ws=(corr_win_samples, n_filters),
ss=(1, n_filters),
flatten=False)
print(corr.shape)
padded_shape = list(corr.shape)
padded_shape[2] = corr_win_samples * 2
padded = np.zeros(padded_shape, dtype=np.float32)
padded[:, :, :corr_win_samples, :] = corr
print(padded.shape)
coeffs = np.fft.fft(padded, axis=2, norm='ortho')
correlated = np.fft.ifft(np.abs(coeffs) ** 2, axis=2, norm='ortho')
return np.concatenate([
correlated[:, :, corr_win_samples:, :],
correlated[:, :, :corr_win_samples, :],
], axis=2)
return correlated
def dct_basis(size):
r = np.arange(size)
basis = np.outer(r, r + 0.5)
basis = np.cos((np.pi / size) * basis)
return basis
def frequency_decomposition(x, sizes):
sizes = sorted(sizes)
if x.ndim == 1:
end = x.dimensions[0].end
x = ArrayWithUnits(
x[None, ...], [TimeDimension(end, end), x.dimensions[0]])
original_size = x.shape[-1]
time_dimension = x.dimensions[-1]
samplerate = audio_sample_rate(time_dimension.samples_per_second)
data = x.copy()
bands = []
frequency_bands = []
start_hz = 0
for size in sizes:
if size != original_size:
s = resample(data, size, axis=-1)
else:
s = data.copy()
bands.append(s)
data -= resample(s, original_size, axis=-1)
stop_hz = samplerate.nyquist * (size / original_size)
frequency_bands.append(FrequencyBand(start_hz, stop_hz))
start_hz = stop_hz
scale = ExplicitScale(frequency_bands)
return FrequencyAdaptive(bands, scale=scale, time_dimension=x.dimensions[0])