Source code for zounds.timeseries.audiosamples

from .samplerate import AudioSampleRate, audio_sample_rate
from soundfile import SoundFile
from io import BytesIO
from zounds.core import IdentityDimension, ArrayWithUnits
from .timeseries import TimeDimension, TimeSlice
from .duration import Picoseconds, Seconds
from .samplerate import SampleRate
import numpy as np


[docs]class AudioSamples(ArrayWithUnits): """ `AudioSamples` represents constant-rate samples of a continuous audio signal at common sampling rates. It is a special case of an :class:`~zounds.core.ArrayWithUnits` whose first dimension is a :class:`~zounds.timeseries.TimeDimension` that has a common audio sampling rate (e.g. :class:`~zounds.timeseries.SR44100`). Args: array (np.ndarray): The raw sample data samplerate (SampleRate): The rate at which data was sampled Raises: ValueError: When array has a second dimension with size greater than 2 TypeError: When samplerate is not a :class:`~zounds.timeseries.AudioSampleRate` (e.g. :class:`~zounds.timeseries.SR22050`) Examples:: >>> from zounds import AudioSamples, SR44100, TimeSlice, Seconds >>> import numpy as np >>> raw = np.random.normal(0, 1, 44100*10) >>> samples = AudioSamples(raw, SR44100()) >>> samples.samples_per_second 44100 >>> samples.channels 1 >>> sliced = samples[TimeSlice(Seconds(2))] >>> sliced.shape (88200,) """ def __new__(cls, array, samplerate): if array.ndim == 1: dimensions = [TimeDimension(*samplerate)] elif array.ndim == 2: dimensions = [TimeDimension(*samplerate), IdentityDimension()] else: raise ValueError( 'array must be one (mono) or two (multi-channel) dimensions') if not isinstance(samplerate, AudioSampleRate): raise TypeError('samplerate should be an AudioSampleRate instance') return ArrayWithUnits.__new__(cls, array, dimensions) def __add__(self, other): try: if self.samplerate != other.samplerate: raise ValueError( 'Samplerates must match, but they were ' '{self.samplerate} and {other.samplerate}' .format(**locals())) except AttributeError: pass return super(AudioSamples, self).__add__(other) def kwargs(self): return {'samplerate': self.samplerate} def sum(self, axis=None, dtype=None, **kwargs): result = super(AudioSamples, self).sum(axis, dtype, **kwargs) if self.ndim == 2 and axis == 1: return AudioSamples(result, self.samplerate) else: return result @classmethod def from_file(cls, file_like_object): with SoundFile(file_like_object, mode='r') as f: samples = f.read(dtype=np.float32) return AudioSamples(samples, audio_sample_rate(f.samplerate)) @classmethod def silence(cls, samplerate, duration, dtype=np.float32, channels=1): shape = (int(duration / samplerate.frequency), channels) silence = np.zeros(shape, dtype=dtype).squeeze() return cls(silence, samplerate) def silence_like(self, duration): x = self.__class__.silence( self.samplerate, duration, self.dtype, self.channels) x[:] = 1 return x def pad_with_silence(self, silence_duration=Seconds(1)): silence = self.__class__.silence( self.samplerate, silence_duration, self.dtype) return AudioSamples(np.concatenate([self, silence]), self.samplerate) @property def samples_per_second(self): return int(Picoseconds(int(1e12)) / self.frequency) @property def duration_in_seconds(self): return self.duration / Picoseconds(int(1e12)) @property def samplerate(self): return SampleRate(self.frequency, self.duration) @property def overlap(self): return self.samplerate.overlap @property def span(self): return self.dimensions[0].span @property def end(self): return self.dimensions[0].end @property def frequency(self): return self.dimensions[0].frequency @property def duration(self): return self.dimensions[0].duration
[docs] @classmethod def from_example(cls, arr, example): return cls(arr, example.samplerate)
@property def channels(self): if len(self.shape) == 1: return 1 return self.shape[1] @property def samplerate(self): return audio_sample_rate(self.samples_per_second) @property def mono(self): """ Return this instance summed to mono. If the instance is already mono, this is a no-op. """ if self.channels == 1: return self x = self.sum(axis=1) * 0.5 y = x * 0.5 return AudioSamples(y, self.samplerate) @property def stereo(self): if self.channels == 2: return self return AudioSamples(np.vstack([self, self]).T, self.samplerate) def __getitem__(self, item): sliced = super(AudioSamples, self).__getitem__(item) try: if sliced.dimensions == self.dimensions: return AudioSamples(sliced, self.samplerate) except AttributeError: pass return sliced def sliding_window(self, samplerate, padding=True): ws = TimeSlice(duration=samplerate.duration) ss = TimeSlice(duration=samplerate.frequency) _, windowed = self.sliding_window_with_leftovers(ws, ss, dopad=padding) return windowed
[docs] def encode(self, flo=None, fmt='WAV', subtype='PCM_16'): """ Return audio samples encoded as bytes given a particular audio format Args: flo (file-like): A file-like object to write the bytes to. If flo is not supplied, a new :class:`io.BytesIO` instance will be created and returned fmt (str): A libsndfile-friendly identifier for an audio encoding (detailed here: http://www.mega-nerd.com/libsndfile/api.html) subtype (str): A libsndfile-friendly identifier for an audio encoding subtype (detailed here: http://www.mega-nerd.com/libsndfile/api.html) Examples: >>> from zounds import SR11025, AudioSamples >>> import numpy as np >>> silence = np.zeros(11025*10) >>> samples = AudioSamples(silence, SR11025()) >>> bio = samples.encode() >>> bio.read(10) 'RIFFx]\\x03\\x00WA' """ flo = flo or BytesIO() with SoundFile( flo, mode='w', channels=self.channels, format=fmt, subtype=subtype, samplerate=self.samples_per_second) as f: if fmt == 'OGG': # KLUDGE: Trying to write too-large chunks to an ogg file seems # to cause a segfault in libsndfile # KLUDGE: This logic is very similar to logic in the OggVorbis # processing node, and should probably be factored into a common # location factor = 20 chunksize = self.samples_per_second * factor for i in range(0, len(self), chunksize): chunk = self[i: i + chunksize] f.write(chunk) else: # write everything in one chunk f.write(self) flo.seek(0) return flo
def save(self, filename, fmt='WAV', subtype='PCM_16'): with open(filename, 'wb') as f: self.encode(f, fmt=fmt, subtype=subtype)