fba8175e99
Zero pads the audio time samples until the length is equal to the next largest power of two. This improves the algorithmic complexity of the FFT calculations. Initialized the Hamming window when the module is loaded instead of every loop. Replaced a call to numpy.roll() with direct array index manipulation.
345 lines
13 KiB
Python
345 lines
13 KiB
Python
from __future__ import print_function
|
|
from __future__ import division
|
|
import time
|
|
import numpy as np
|
|
from scipy.ndimage.filters import gaussian_filter1d
|
|
import config
|
|
import microphone
|
|
import dsp
|
|
import led
|
|
|
|
_time_prev = time.time() * 1000.0
|
|
"""The previous time that the frames_per_second() function was called"""
|
|
|
|
_fps = dsp.ExpFilter(val=config.FPS, alpha_decay=0.002, alpha_rise=0.002)
|
|
"""The low-pass filter used to estimate frames-per-second"""
|
|
|
|
|
|
def frames_per_second():
|
|
"""Return the estimated frames per second
|
|
|
|
Returns the current estimate for frames-per-second (FPS).
|
|
FPS is estimated by measured the amount of time that has elapsed since
|
|
this function was previously called. The FPS estimate is low-pass filtered
|
|
to reduce noise.
|
|
|
|
This function is intended to be called one time for every iteration of
|
|
the program's main loop.
|
|
|
|
Returns
|
|
-------
|
|
fps : float
|
|
Estimated frames-per-second. This value is low-pass filtered
|
|
to reduce noise.
|
|
"""
|
|
global _time_prev, _fps
|
|
time_now = time.time() * 1000.0
|
|
dt = time_now - _time_prev
|
|
_time_prev = time_now
|
|
if dt == 0.0:
|
|
return _fps.value
|
|
return _fps.update(1000.0 / dt)
|
|
|
|
|
|
def interpolate(y, new_length):
|
|
"""Intelligently resizes the array by linearly interpolating the values
|
|
|
|
Parameters
|
|
----------
|
|
y : np.array
|
|
Array that should be resized
|
|
|
|
new_length : int
|
|
The length of the new interpolated array
|
|
|
|
Returns
|
|
-------
|
|
z : np.array
|
|
New array with length of new_length that contains the interpolated
|
|
values of y.
|
|
"""
|
|
if len(y) == new_length:
|
|
return y
|
|
x_old = np.linspace(0, 1, len(y))
|
|
x_new = np.linspace(0, 1, new_length)
|
|
z = np.interp(x_new, x_old, y)
|
|
return z
|
|
|
|
|
|
r_filt = dsp.ExpFilter(np.tile(0.01, config.N_PIXELS // 2),
|
|
alpha_decay=0.2, alpha_rise=0.99)
|
|
g_filt = dsp.ExpFilter(np.tile(0.01, config.N_PIXELS // 2),
|
|
alpha_decay=0.05, alpha_rise=0.3)
|
|
b_filt = dsp.ExpFilter(np.tile(0.01, config.N_PIXELS // 2),
|
|
alpha_decay=0.1, alpha_rise=0.5)
|
|
common_mode = dsp.ExpFilter(np.tile(0.01, config.N_PIXELS // 2),
|
|
alpha_decay=0.99, alpha_rise=0.01)
|
|
p_filt = dsp.ExpFilter(np.tile(1, (3, config.N_PIXELS // 2)),
|
|
alpha_decay=0.1, alpha_rise=0.99)
|
|
p = np.tile(1.0, (3, config.N_PIXELS // 2))
|
|
gain = dsp.ExpFilter(np.tile(0.01, config.N_FFT_BINS),
|
|
alpha_decay=0.001, alpha_rise=0.99)
|
|
|
|
|
|
def visualize_scroll(y):
|
|
"""Effect that originates in the center and scrolls outwards"""
|
|
global p
|
|
y = np.copy(y)**2.0
|
|
gain.update(y)
|
|
y /= gain.value
|
|
y *= 255.0
|
|
r = int(max(y[:len(y) // 3]))
|
|
g = int(max(y[len(y) // 3: 2 * len(y) // 3]))
|
|
b = int(max(y[2 * len(y) // 3:]))
|
|
# Scrolling effect window
|
|
p = np.roll(p, 1, axis=1)
|
|
p *= 0.98
|
|
p = gaussian_filter1d(p, sigma=0.2)
|
|
# Create new color originating at the center
|
|
p[0, 0] = r
|
|
p[1, 0] = g
|
|
p[2, 0] = b
|
|
# Update the LED strip
|
|
return np.concatenate((p[:, ::-1], p), axis=1)
|
|
|
|
|
|
def visualize_energy(y):
|
|
"""Effect that expands from the center with increasing sound energy"""
|
|
global p
|
|
y = np.copy(y)
|
|
gain.update(y)
|
|
y /= gain.value
|
|
# Scale by the width of the LED strip
|
|
y *= float((config.N_PIXELS // 2) - 1)
|
|
# Map color channels according to energy in the different freq bands
|
|
scale = 0.9
|
|
r = int(np.mean(y[:len(y) // 3]**scale))
|
|
g = int(np.mean(y[len(y) // 3: 2 * len(y) // 3]**scale))
|
|
b = int(np.mean(y[2 * len(y) // 3:]**scale))
|
|
# Assign color to different frequency regions
|
|
p[0, :r] = 255.0
|
|
p[0, r:] = 0.0
|
|
p[1, :g] = 255.0
|
|
p[1, g:] = 0.0
|
|
p[2, :b] = 255.0
|
|
p[2, b:] = 0.0
|
|
p_filt.update(p)
|
|
p = np.round(p_filt.value)
|
|
# Apply substantial blur to smooth the edges
|
|
p[0, :] = gaussian_filter1d(p[0, :], sigma=4.0)
|
|
p[1, :] = gaussian_filter1d(p[1, :], sigma=4.0)
|
|
p[2, :] = gaussian_filter1d(p[2, :], sigma=4.0)
|
|
# Set the new pixel value
|
|
return np.concatenate((p[:, ::-1], p), axis=1)
|
|
|
|
|
|
_prev_spectrum = np.tile(0.01, config.N_PIXELS // 2)
|
|
|
|
|
|
def visualize_spectrum(y):
|
|
"""Effect that maps the Mel filterbank frequencies onto the LED strip"""
|
|
global _prev_spectrum
|
|
y = np.copy(interpolate(y, config.N_PIXELS // 2))
|
|
common_mode.update(gaussian_filter1d(y, sigma=2.0))
|
|
diff = y - _prev_spectrum
|
|
_prev_spectrum = np.copy(y)
|
|
r = gaussian_filter1d(y, sigma=0.5) - common_mode.value
|
|
# g = gaussian_filter1d(y, sigma=0.5) - common_mode.value
|
|
b = gaussian_filter1d(y, sigma=0.0) - common_mode.value
|
|
# Update temporal filters
|
|
r = r_filt.update(r)
|
|
# g = g_filt.update(g)
|
|
g = np.abs(diff)
|
|
b = b_filt.update(b)
|
|
# Mirror the color channels for symmetric output
|
|
pixel_r = np.concatenate((r[::-1], r))
|
|
pixel_g = np.concatenate((g[::-1], g))
|
|
pixel_b = np.concatenate((b[::-1], b))
|
|
output = np.array([pixel_r, pixel_g, pixel_b]) * 255.0
|
|
return output
|
|
|
|
|
|
fft_plot_filter = dsp.ExpFilter(np.tile(1e-1, config.N_FFT_BINS),
|
|
alpha_decay=0.5, alpha_rise=0.99)
|
|
mel_gain = dsp.ExpFilter(np.tile(1e-1, config.N_FFT_BINS),
|
|
alpha_decay=0.01, alpha_rise=0.99)
|
|
mel_smoothing = dsp.ExpFilter(np.tile(1e-1, config.N_FFT_BINS),
|
|
alpha_decay=0.5, alpha_rise=0.99)
|
|
volume = dsp.ExpFilter(config.MIN_VOLUME_THRESHOLD,
|
|
alpha_decay=0.02, alpha_rise=0.02)
|
|
fft_window = np.hamming(int(config.MIC_RATE / config.FPS) * config.N_ROLLING_HISTORY)
|
|
# Keeps track of the number of buffer overflows
|
|
# Lots of buffer overflows could mean that FPS is set too high
|
|
buffer_overflows = 1
|
|
|
|
|
|
def microphone_update(stream):
|
|
global y_roll, prev_rms, prev_exp
|
|
# Retrieve and normalize the new audio samples
|
|
try:
|
|
y = np.fromstring(stream.read(samples_per_frame), dtype=np.int16)
|
|
except IOError:
|
|
y = y_roll[config.N_ROLLING_HISTORY - 1, :]
|
|
global buffer_overflows
|
|
print('Buffer overflows: {0}'.format(buffer_overflows))
|
|
buffer_overflows += 1
|
|
# Normalize samples between 0 and 1
|
|
y = y / 2.0**15
|
|
# Construct a rolling window of audio samples
|
|
y_roll[:-1] = y_roll[1:]
|
|
y_roll[-1, :] = np.copy(y)
|
|
y_data = np.concatenate(y_roll, axis=0)
|
|
volume.update(np.nanmean(y_data ** 2))
|
|
|
|
if volume.value < config.MIN_VOLUME_THRESHOLD:
|
|
print('No audio input. Volume below threshold. Volume:', volume.value)
|
|
led.pixels = np.tile(0, (3, config.N_PIXELS))
|
|
led.update()
|
|
else:
|
|
# Transform audio input into the frequency domain
|
|
N = len(y_data)
|
|
N_zeros = 2**int(np.ceil(np.log2(N))) - N
|
|
# Pad with zeros until the next power of two
|
|
y_data *= fft_window
|
|
y_padded = np.pad(y_data, (0, N_zeros), mode='constant')
|
|
YS = np.abs(np.fft.rfft(y_padded)[:N // 2])
|
|
# Construct a Mel filterbank from the FFT data
|
|
mel = np.atleast_2d(np.abs(YS)).T * dsp.mel_y.T
|
|
# Scale data to values more suitable for visualization
|
|
mel = np.mean(mel, axis=0)
|
|
mel = mel**2.0
|
|
# Gain normalization
|
|
mel_gain.update(np.max(gaussian_filter1d(mel, sigma=1.0)))
|
|
mel = mel / mel_gain.value
|
|
mel = mel_smoothing.update(mel)
|
|
# Map filterbank output onto LED strip
|
|
output = visualization_effect(mel)
|
|
led.pixels = output
|
|
led.update()
|
|
if config.USE_GUI:
|
|
# Plot filterbank output
|
|
x = np.linspace(config.MIN_FREQUENCY, config.MAX_FREQUENCY, len(mel))
|
|
mel_curve.setData(x=x, y=fft_plot_filter.update(mel))
|
|
# Plot the color channels
|
|
r_curve.setData(y=led.pixels[0])
|
|
g_curve.setData(y=led.pixels[1])
|
|
b_curve.setData(y=led.pixels[2])
|
|
if config.USE_GUI:
|
|
app.processEvents()
|
|
if config.DISPLAY_FPS:
|
|
print('FPS {:.0f} / {:.0f}'.format(frames_per_second(), config.FPS))
|
|
|
|
|
|
# Number of audio samples to read every time frame
|
|
samples_per_frame = int(config.MIC_RATE / config.FPS)
|
|
|
|
# Array containing the rolling audio sample window
|
|
y_roll = np.random.rand(config.N_ROLLING_HISTORY, samples_per_frame) / 1e16
|
|
|
|
visualization_effect = visualize_spectrum
|
|
"""Visualization effect to display on the LED strip"""
|
|
|
|
|
|
if __name__ == '__main__':
|
|
if config.USE_GUI:
|
|
import pyqtgraph as pg
|
|
from pyqtgraph.Qt import QtGui, QtCore
|
|
# Create GUI window
|
|
app = QtGui.QApplication([])
|
|
view = pg.GraphicsView()
|
|
layout = pg.GraphicsLayout(border=(100,100,100))
|
|
view.setCentralItem(layout)
|
|
view.show()
|
|
view.setWindowTitle('Visualization')
|
|
view.resize(800,600)
|
|
# Mel filterbank plot
|
|
fft_plot = layout.addPlot(title='Filterbank Output', colspan=3)
|
|
fft_plot.setRange(yRange=[-0.1, 1.2])
|
|
fft_plot.disableAutoRange(axis=pg.ViewBox.YAxis)
|
|
x_data = np.array(range(1, config.N_FFT_BINS + 1))
|
|
mel_curve = pg.PlotCurveItem()
|
|
mel_curve.setData(x=x_data, y=x_data*0)
|
|
fft_plot.addItem(mel_curve)
|
|
# Visualization plot
|
|
layout.nextRow()
|
|
led_plot = layout.addPlot(title='Visualization Output', colspan=3)
|
|
led_plot.setRange(yRange=[-5, 260])
|
|
led_plot.disableAutoRange(axis=pg.ViewBox.YAxis)
|
|
# Pen for each of the color channel curves
|
|
r_pen = pg.mkPen((255, 30, 30, 200), width=4)
|
|
g_pen = pg.mkPen((30, 255, 30, 200), width=4)
|
|
b_pen = pg.mkPen((30, 30, 255, 200), width=4)
|
|
# Color channel curves
|
|
r_curve = pg.PlotCurveItem(pen=r_pen)
|
|
g_curve = pg.PlotCurveItem(pen=g_pen)
|
|
b_curve = pg.PlotCurveItem(pen=b_pen)
|
|
# Define x data
|
|
x_data = np.array(range(1, config.N_PIXELS + 1))
|
|
r_curve.setData(x=x_data, y=x_data*0)
|
|
g_curve.setData(x=x_data, y=x_data*0)
|
|
b_curve.setData(x=x_data, y=x_data*0)
|
|
# Add curves to plot
|
|
led_plot.addItem(r_curve)
|
|
led_plot.addItem(g_curve)
|
|
led_plot.addItem(b_curve)
|
|
# Frequency range label
|
|
freq_label = pg.LabelItem('')
|
|
# Frequency slider
|
|
def freq_slider_change(tick):
|
|
minf = freq_slider.tickValue(0)**2.0 * (config.MIC_RATE / 2.0)
|
|
maxf = freq_slider.tickValue(1)**2.0 * (config.MIC_RATE / 2.0)
|
|
t = 'Frequency range: {:.0f} - {:.0f} Hz'.format(minf, maxf)
|
|
freq_label.setText(t)
|
|
config.MIN_FREQUENCY = minf
|
|
config.MAX_FREQUENCY = maxf
|
|
dsp.create_mel_bank()
|
|
freq_slider = pg.TickSliderItem(orientation='bottom', allowAdd=False)
|
|
freq_slider.addTick((config.MIN_FREQUENCY / (config.MIC_RATE / 2.0))**0.5)
|
|
freq_slider.addTick((config.MAX_FREQUENCY / (config.MIC_RATE / 2.0))**0.5)
|
|
freq_slider.tickMoveFinished = freq_slider_change
|
|
freq_label.setText('Frequency range: {} - {} Hz'.format(
|
|
config.MIN_FREQUENCY,
|
|
config.MAX_FREQUENCY))
|
|
# Effect selection
|
|
active_color = '#16dbeb'
|
|
inactive_color = '#FFFFFF'
|
|
def energy_click(x):
|
|
global visualization_effect
|
|
visualization_effect = visualize_energy
|
|
energy_label.setText('Energy', color=active_color)
|
|
scroll_label.setText('Scroll', color=inactive_color)
|
|
spectrum_label.setText('Spectrum', color=inactive_color)
|
|
def scroll_click(x):
|
|
global visualization_effect
|
|
visualization_effect = visualize_scroll
|
|
energy_label.setText('Energy', color=inactive_color)
|
|
scroll_label.setText('Scroll', color=active_color)
|
|
spectrum_label.setText('Spectrum', color=inactive_color)
|
|
def spectrum_click(x):
|
|
global visualization_effect
|
|
visualization_effect = visualize_spectrum
|
|
energy_label.setText('Energy', color=inactive_color)
|
|
scroll_label.setText('Scroll', color=inactive_color)
|
|
spectrum_label.setText('Spectrum', color=active_color)
|
|
# Create effect "buttons" (labels with click event)
|
|
energy_label = pg.LabelItem('Energy')
|
|
scroll_label = pg.LabelItem('Scroll')
|
|
spectrum_label = pg.LabelItem('Spectrum')
|
|
energy_label.mousePressEvent = energy_click
|
|
scroll_label.mousePressEvent = scroll_click
|
|
spectrum_label.mousePressEvent = spectrum_click
|
|
energy_click(0)
|
|
# Layout
|
|
layout.nextRow()
|
|
layout.addItem(freq_label, colspan=3)
|
|
layout.nextRow()
|
|
layout.addItem(freq_slider, colspan=3)
|
|
layout.nextRow()
|
|
layout.addItem(energy_label)
|
|
layout.addItem(scroll_label)
|
|
layout.addItem(spectrum_label)
|
|
# Initialize LEDs
|
|
led.update()
|
|
# Start listening to live audio stream
|
|
microphone.start_stream(microphone_update)
|