Source code for zounds.soundfile.resample



from ctypes import *

import numpy as np

from zounds.timeseries import SR44100, AudioSamples, Seconds
from zounds.core import ArrayWithUnits

try:
    libsamplerate = CDLL('libsamplerate.so')
except OSError as e:
    # KLUDGE: This is here to support building documentation on readthedocs
    pass

from featureflow import Node


class SRC_DATA(Structure):
    """
    A wrapper for the libsamplerate.SRC_DATA struct
    """
    _fields_ = [('data_in', POINTER(c_float)),
                ('data_out', POINTER(c_float)),
                ('input_frames', c_long),
                ('output_frames', c_long),
                ('input_frames_used', c_long),
                ('output_frames_gen', c_long),
                ('end_of_input', c_int),
                ('src_ratio', c_double), ]


class SRC_STATE(Structure):
    """
    A dummy structure to represent the state returned from libsamplerate
    src_new.
    """
    _fields_ = []


class Resample(object):
    """
    A wrapper around the libsamplerate src_process() method.  This class is
    intended for one-time use. New instances should be created for each sound\
    file processed.
    """

    def __init__(
            self,
            orig_sample_rate,
            new_sample_rate,
            nchannels=1,
            converter_type=1):

        """
        orig_sample_rate - The sample rate of the incoming samples, in hz
        new_sample_rate - The sample_rate of the outgoiing samples, in hz
        n_channels - Number of channels in the incoming and outgoing samples
        converter_type - See http://www.mega-nerd.com/SRC/api_misc.html#Converters
                         for a list of conversion types. "0" is the best-quality,
                         and slowest converter

        """
        super(Resample, self).__init__()
        self._ratio = new_sample_rate / orig_sample_rate
        # check if the conversion ratio is considered valid by libsamplerate
        if not libsamplerate.src_is_valid_ratio(c_double(self._ratio)):
            raise ValueError('%1.2f / %1.2f = %1.4f is not a valid ratio' % \
                             (new_sample_rate, orig_sample_rate, self._ratio))
        # create a pointer to the SRC_STATE struct, which maintains state
        # between calls to src_process()
        self.error = pointer(c_int(0))
        self.nchannels = nchannels
        self.converter_type = converter_type
        self.c_int_converter_type = c_int(converter_type)
        self.c_int_channels = c_int(self.nchannels)
        libsamplerate.src_new.restype = POINTER(SRC_STATE)
        self._state = libsamplerate.src_new(
            self.c_int_converter_type, self.c_int_channels, self.error)

    def _prepare_input(self, insamples):
        # ensure that the input is float data
        if np.float32 != insamples.dtype:
            return insamples.astype(np.float32)
        return insamples

    def _output_buffer(self, insamples):
        outsize = (int(np.round(len(insamples) * self._ratio)), self.nchannels)
        return np.zeros(outsize, dtype=np.float32).squeeze()

    def _check_for_error(self, return_code):
        if return_code:
            raise Exception(
                'libsamplerate sent non-zero return code {return_code}'
                    .format(**locals()))

    def __call__(self, insamples, end_of_input=False):

        normalized_insamples = self._prepare_input(insamples)
        outsamples = self._output_buffer(normalized_insamples)

        insamples_ptr = normalized_insamples.ctypes.data_as(POINTER(c_float))
        outsamples_ptr = outsamples.ctypes.data_as(POINTER(c_float))

        sd = SRC_DATA(
            # a pointer to the input samples
            data_in=insamples_ptr,
            # a pointer to the output buffer
            data_out=outsamples_ptr,
            # number of input samples
            input_frames=len(normalized_insamples),
            # number of output samples
            output_frames=len(outsamples),
            # NOT the end of input, i.e., there is more data to process
            end_of_input=int(end_of_input),
            # the conversion ratio
            src_ratio=self._ratio)
        sd_ptr = pointer(sd)
        rv = libsamplerate.src_process(self._state, sd_ptr)
        self._check_for_error(rv)
        return outsamples


[docs]class Resampler(Node): """ `Resampler` expects to process :class:`~zounds.timeseries.AudioSamples` instances (e.g., those produced by a :class:`AudioStream` node), and will produce a new stream of :class:`AudioSamples` at a new sampling rate. Args: samplerate (AudioSampleRate): the desired sampling rate. If none is provided, the default is :class:`~zounds.timeseries.SR44100` needs (Feature): a processing node that produces :class:`~zounds.timeseries.AudioSamples` Here's how you'd typically see :class:`Resampler` used in a processing graph. .. code:: python import featureflow as ff import zounds chunksize = zounds.ChunkSizeBytes( samplerate=zounds.SR44100(), duration=zounds.Seconds(30), bit_depth=16, channels=2) @zounds.simple_in_memory_settings class Document(ff.BaseModel): meta = ff.JSONFeature( zounds.MetaData, store=True, encoder=zounds.AudioMetaDataEncoder) raw = ff.ByteStreamFeature( ff.ByteStream, chunksize=chunksize, needs=meta, store=False) pcm = zounds.AudioSamplesFeature( zounds.AudioStream, needs=raw, store=True) resampled = zounds.AudioSamplesFeature( zounds.Resampler, samplerate=zounds.SR22050(), needs=pcm, store=True) synth = zounds.NoiseSynthesizer(zounds.SR11025()) samples = synth.synthesize(zounds.Seconds(10)) raw_bytes = samples.encode() _id = Document.process(meta=raw_bytes) doc = Document(_id) print doc.pcm.samplerate.__class__.__name__ # SR11025 print doc.resampled.samplerate.__class__.__name__ # SR22050 """ def __init__(self, samplerate=None, needs=None): super(Resampler, self).__init__(needs=needs) self._samplerate = samplerate or SR44100() self._resample = None def _noop(self, data, finalized): return data def _process(self, data): sr = data.samples_per_second if self._resample is None: target_sr = self._samplerate.samples_per_second self._resample = Resample( sr, target_sr, 1 if len(data.shape) == 1 else data.shape[1]) if target_sr != sr: self._rs = self._resample # KLUDGE: The following line seems to solve a bug whereby # libsamplerate doesn't generate enough samples the first time # src_process is called. We're calling it once here, so the "real" # output will come out click-free silence = AudioSamples.silence( self._samplerate, Seconds(1), channels=data.channels) self._resample(silence) else: self._rs = self._noop resampled = self._rs(data, self._finalized) if not isinstance(resampled, ArrayWithUnits): resampled = AudioSamples(resampled, self._samplerate) yield resampled