Source code for mrcrowbar.sound

from array import array
from enum import IntEnum
import math

from mrcrowbar import encoding
from mrcrowbar.common import is_bytes, bounds

try:
    import pyaudio
except ImportError:
    pyaudio = None

RESAMPLE_BUFFER = 4096
NORMALIZE_BUFFER = 8192
RESAMPLE_RATE = 44100
PYAUDIO_NORMALISE_TYPE = 'paFloat32'

[docs] class AudioInterpolation( IntEnum ): #: Perform no audio interpolation and let PortAudio sort it out. #: (Sounds like CUBIC) NONE = 0 #: Perform sharp linear interpolation between samples. #: This is the algorithm used by most early DSPs, such as the one in the original #: Sound Blaster, and has a pleasing brightness and crispness to it. LINEAR = 1 #: Perform sharp step interpolation between samples. #: I'm sure if you go nasty enough, you could find a DSP that sounds like this. #: This sounds like LINEAR, except with more distortion. STEP = 2
mix_linear = lambda a, b, alpha: (b-a)*alpha+a mix_step = lambda a, b, alpha: a
[docs] def normalise_audio( source, format_type, field_size, signedness, endian, start=None, end=None, length=None ): assert is_bytes( source ) start, end = bounds( start, end, length, len( source ) ) if format_type == float: return array( 'f', encoding.unpack_array( (format_type, field_size, signedness, endian), source[start:end] ) ) elif format_type == int: divisor = 1 << (field_size*8-1) if signedness == 'signed': return array( 'f', (float( x )/divisor for x in encoding.unpack_array( (format_type, field_size, signedness, endian), source[start:end] )) ) else: return array( 'f', (float( x-divisor )/divisor for x in encoding.unpack_array( (format_type, field_size, signedness, endian), source[start:end] )) ) return array( 'f' )
[docs] def normalise_audio_iter( source, format_type, field_size, signedness, endian, start=None, end=None, length=None, overlap=0, chunk_size=NORMALIZE_BUFFER ): assert is_bytes( source ) start, end = bounds( start, end, length, len( source ) ) increment = (chunk_size+overlap*field_size) for i in range( start, end, chunk_size ): yield normalise_audio( source, format_type, field_size, signedness, endian, start=i, end=None, length=increment )
[docs] def resample_audio_iter( source, format_type, field_size, signedness, endian, channels, sample_rate, start=None, end=None, length=None, interpolation=AudioInterpolation.LINEAR, output_rate=RESAMPLE_RATE ): if sample_rate == 0: yield array( 'f' ) return assert is_bytes( source ) start, end = bounds( start, end, length, len( source ) ) mixer = mix_linear if interpolation == AudioInterpolation.STEP: mixer = mix_step new_len = (end-start)*output_rate//sample_rate src_inc = NORMALIZE_BUFFER chunk_size=src_inc*channels src_iter = normalise_audio_iter( source, format_type, field_size, signedness, endian, start, end, overlap=channels, chunk_size=chunk_size ) src = next( src_iter, None ) src_bound = src_inc for index_base in range( 0, new_len, RESAMPLE_BUFFER ): buffer = array( 'f', (0.0 for i in range( RESAMPLE_BUFFER ))) for index in range( RESAMPLE_BUFFER ): tgt_pos = index_base + index src_pos = sample_rate*tgt_pos/output_rate samp_index = math.floor( src_pos ) % src_inc alpha = math.fmod( src_pos, 1.0 ) if src_pos > src_bound: src = next( src_iter, None ) src_bound += src_inc if src is None: break a = 0.0 if samp_index >= len( src ) else src[samp_index] b = 0.0 if samp_index+channels >= len( src ) else src[samp_index+channels] buffer[index] = mixer( a, b, alpha ) yield buffer
[docs] def play_pcm( source, channels, sample_rate, format_type, field_size, signedness, endian, start=None, end=None, length=None, interpolation=AudioInterpolation.LINEAR ): """Play back a byte string as PCM audio. source The byte string to play. channels Number of audio channels. sample_rate Audio sample rate in Hz. format_type Type of sample encoding; either int or float. field_size Size of each sample, in bytes. signedness Signedness of each sample; either 'signed' or 'unsigned'. endian Endianness of each sample; either 'big', 'little' or None. start Start offset to read from (default: start). end End offset to stop reading at (default: end). length Length to read in (optional replacement for end). interpolation Interpolation algorithm to use for upsampling. Defaults to AudioInterpolation.LINEAR. """ assert is_bytes( source ) start, end = bounds( start, end, length, len( source ) ) if not pyaudio: raise ImportError( 'pyaudio must be installed for audio playback support (see https://people.csail.mit.edu/hubert/pyaudio)' ) audio = pyaudio.PyAudio() format = getattr( pyaudio, PYAUDIO_NORMALISE_TYPE ) playback_rate = None if interpolation == AudioInterpolation.NONE: playback_rate = sample_rate else: playback_rate = RESAMPLE_RATE samp_iter = resample_audio_iter( source, format_type, field_size, signedness, endian, channels, sample_rate, start, end, output_rate=playback_rate, interpolation=interpolation ) stream = audio.open( format=format, channels=channels, rate=playback_rate, output=True ) for samples in samp_iter: stream.write( samples.tobytes() ) stream.stop_stream() stream.close()