Source code for zounds.soundfile.audiostream

from io import BytesIO
from os import SEEK_END
from soundfile import SoundFile
from zounds.timeseries import audio_sample_rate, AudioSamples
from .byte_depth import chunk_size_samples
from featureflow import Node


[docs]class AudioStream(Node): """ `AudioStream` expects to process a raw stream of bytes (e.g. one produced by :class:`featureflow.ByteStream`) and produces chunks of :class:`~zounds.timeseries.AudioSamples` Args: sum_to_mono (bool): True if this node should return a :class:`~zounds.timeseries.AudioSamples` instance with a single channel needs (Feature): a processing node that produces a byte stream (e.g. :class:`~featureflow.ByteStream` Here's how'd you typically see :class:`AudioStream` used in a processing graph. .. code:: python import featureflow as ff import zounds chunksize = zounds.ChunkSizeBytes( samplerate=zounds.SR44100(), duration=zounds.Seconds(30), bit_depth=16, channels=2) @zounds.simple_in_memory_settings class Document(ff.BaseModel): meta = ff.JSONFeature( zounds.MetaData, store=True, encoder=zounds.AudioMetaDataEncoder) raw = ff.ByteStreamFeature( ff.ByteStream, chunksize=chunksize, needs=meta, store=False) pcm = zounds.AudioSamplesFeature( zounds.AudioStream, needs=raw, store=True) synth = zounds.NoiseSynthesizer(zounds.SR11025()) samples = synth.synthesize(zounds.Seconds(10)) raw_bytes = samples.encode() _id = Document.process(meta=raw_bytes) doc = Document(_id) print doc.pcm.__class__ # returns an AudioSamples instance """ def __init__( self, sum_to_mono=True, needs=None): super(AudioStream, self).__init__(needs=needs) self._sum_to_mono = sum_to_mono self._buf = None self._sf = None self._chunk_size_samples = None self._cache = b'' def _enqueue(self, data, pusher): self._cache += data def _dequeue(self): v = self._cache self._cache = b'' return v def _get_samples(self): raw_samples = self._sf.read(self._chunk_size_samples) sr = audio_sample_rate(self._sf.samplerate) samples = AudioSamples(raw_samples, sr) if self._sum_to_mono: return samples.mono return samples def _process(self, data): b = data # TODO: Use the _first_chunk() hook instead of the if statement if self._buf is None: self._buf = MemoryBuffer(b.total_length) self._buf.write(b) if self._sf is None: self._sf = SoundFile(self._buf) if self._chunk_size_samples is None: self._chunk_size_samples = chunk_size_samples(self._sf, b) yield self._get_samples() def _last_chunk(self): samples = self._get_samples() while samples.size: yield samples samples = self._get_samples()
class MemoryBuffer(object): """ This class is the implementation of the virtual io interface required by PySoundfile/libsndfile. Some of the stateful/hacky things in this class (see KLUDGE note below) could be avoided if this class had the following modifications: - it maintains its own position - it maintains the span of its buffer/BytesIO instance - """ def __init__(self, content_length, max_size=10 * 1024 * 1024): super(MemoryBuffer, self).__init__() self._content_length = content_length self._buf = BytesIO() self._max_size = max_size self.tell = self._tell self._total_bytes_written = 0 self._total_bytes_read = 0 def read(self, count): if count == -1: return self._buf.read() data = self._buf.read(count) return data def readinto(self, buf): data = self.read(len(buf)) ld = len(data) buf[:ld] = data return ld def write(self, data): read_pos = self._buf.tell() if read_pos > self._max_size: new_buf = BytesIO() new_buf.write(self._buf.read()) self._buf = new_buf read_pos = 0 self._buf.seek(0, 2) self._buf.write(data) self._buf.seek(read_pos) return len(data) def _tell(self): return self._buf.tell() def _tell_end(self): self.tell = self._tell return self._content_length def seek(self, offset, whence): if whence == SEEK_END: # KLUDGE: PySoundfile no longer supports __len__, which means that # this stateful garbage is required. self.tell = self._tell_end self._buf.seek(offset, whence) def flush(self): self._buf.flush()