35c26ca7bb
Over a dozen small performance optimizations * Memoization for linspace generation * Removed unnecessary copies * Limited the rate at which information is printed. Excessive `print()` output was causing issues for some SSH users
357 lines
13 KiB
Python
357 lines
13 KiB
Python
from __future__ import print_function
|
|
from __future__ import division
|
|
import time
|
|
import numpy as np
|
|
from scipy.ndimage.filters import gaussian_filter1d
|
|
import config
|
|
import microphone
|
|
import dsp
|
|
import led
|
|
|
|
_time_prev = time.time() * 1000.0
|
|
"""The previous time that the frames_per_second() function was called"""
|
|
|
|
_fps = dsp.ExpFilter(val=config.FPS, alpha_decay=0.2, alpha_rise=0.2)
|
|
"""The low-pass filter used to estimate frames-per-second"""
|
|
|
|
|
|
def frames_per_second():
|
|
"""Return the estimated frames per second
|
|
|
|
Returns the current estimate for frames-per-second (FPS).
|
|
FPS is estimated by measured the amount of time that has elapsed since
|
|
this function was previously called. The FPS estimate is low-pass filtered
|
|
to reduce noise.
|
|
|
|
This function is intended to be called one time for every iteration of
|
|
the program's main loop.
|
|
|
|
Returns
|
|
-------
|
|
fps : float
|
|
Estimated frames-per-second. This value is low-pass filtered
|
|
to reduce noise.
|
|
"""
|
|
global _time_prev, _fps
|
|
time_now = time.time() * 1000.0
|
|
dt = time_now - _time_prev
|
|
_time_prev = time_now
|
|
if dt == 0.0:
|
|
return _fps.value
|
|
return _fps.update(1000.0 / dt)
|
|
|
|
|
|
def memoize(function):
|
|
"""Provides a decorator for memoizing functions"""
|
|
from functools import wraps
|
|
memo = {}
|
|
|
|
@wraps(function)
|
|
def wrapper(*args):
|
|
if args in memo:
|
|
return memo[args]
|
|
else:
|
|
rv = function(*args)
|
|
memo[args] = rv
|
|
return rv
|
|
return wrapper
|
|
|
|
|
|
@memoize
|
|
def _normalized_linspace(size):
|
|
return np.linspace(0, 1, size)
|
|
|
|
|
|
def interpolate(y, new_length):
|
|
"""Intelligently resizes the array by linearly interpolating the values
|
|
|
|
Parameters
|
|
----------
|
|
y : np.array
|
|
Array that should be resized
|
|
|
|
new_length : int
|
|
The length of the new interpolated array
|
|
|
|
Returns
|
|
-------
|
|
z : np.array
|
|
New array with length of new_length that contains the interpolated
|
|
values of y.
|
|
"""
|
|
if len(y) == new_length:
|
|
return y
|
|
x_old = _normalized_linspace(len(y))
|
|
x_new = _normalized_linspace(new_length)
|
|
z = np.interp(x_new, x_old, y)
|
|
return z
|
|
|
|
|
|
r_filt = dsp.ExpFilter(np.tile(0.01, config.N_PIXELS // 2),
|
|
alpha_decay=0.2, alpha_rise=0.99)
|
|
g_filt = dsp.ExpFilter(np.tile(0.01, config.N_PIXELS // 2),
|
|
alpha_decay=0.05, alpha_rise=0.3)
|
|
b_filt = dsp.ExpFilter(np.tile(0.01, config.N_PIXELS // 2),
|
|
alpha_decay=0.1, alpha_rise=0.5)
|
|
common_mode = dsp.ExpFilter(np.tile(0.01, config.N_PIXELS // 2),
|
|
alpha_decay=0.99, alpha_rise=0.01)
|
|
p_filt = dsp.ExpFilter(np.tile(1, (3, config.N_PIXELS // 2)),
|
|
alpha_decay=0.1, alpha_rise=0.99)
|
|
p = np.tile(1.0, (3, config.N_PIXELS // 2))
|
|
gain = dsp.ExpFilter(np.tile(0.01, config.N_FFT_BINS),
|
|
alpha_decay=0.001, alpha_rise=0.99)
|
|
|
|
|
|
def visualize_scroll(y):
|
|
"""Effect that originates in the center and scrolls outwards"""
|
|
global p
|
|
y = y**2.0
|
|
gain.update(y)
|
|
y /= gain.value
|
|
y *= 255.0
|
|
r = int(np.max(y[:len(y) // 3]))
|
|
g = int(np.max(y[len(y) // 3: 2 * len(y) // 3]))
|
|
b = int(np.max(y[2 * len(y) // 3:]))
|
|
# Scrolling effect window
|
|
p[:, 1:] = p[:, :-1]
|
|
p *= 0.98
|
|
p = gaussian_filter1d(p, sigma=0.2)
|
|
# Create new color originating at the center
|
|
p[0, 0] = r
|
|
p[1, 0] = g
|
|
p[2, 0] = b
|
|
# Update the LED strip
|
|
return np.concatenate((p[:, ::-1], p), axis=1)
|
|
|
|
|
|
def visualize_energy(y):
|
|
"""Effect that expands from the center with increasing sound energy"""
|
|
global p
|
|
y = np.copy(y)
|
|
gain.update(y)
|
|
y /= gain.value
|
|
# Scale by the width of the LED strip
|
|
y *= float((config.N_PIXELS // 2) - 1)
|
|
# Map color channels according to energy in the different freq bands
|
|
scale = 0.9
|
|
r = int(np.mean(y[:len(y) // 3]**scale))
|
|
g = int(np.mean(y[len(y) // 3: 2 * len(y) // 3]**scale))
|
|
b = int(np.mean(y[2 * len(y) // 3:]**scale))
|
|
# Assign color to different frequency regions
|
|
p[0, :r] = 255.0
|
|
p[0, r:] = 0.0
|
|
p[1, :g] = 255.0
|
|
p[1, g:] = 0.0
|
|
p[2, :b] = 255.0
|
|
p[2, b:] = 0.0
|
|
p_filt.update(p)
|
|
p = np.round(p_filt.value)
|
|
# Apply substantial blur to smooth the edges
|
|
p[0, :] = gaussian_filter1d(p[0, :], sigma=4.0)
|
|
p[1, :] = gaussian_filter1d(p[1, :], sigma=4.0)
|
|
p[2, :] = gaussian_filter1d(p[2, :], sigma=4.0)
|
|
# Set the new pixel value
|
|
return np.concatenate((p[:, ::-1], p), axis=1)
|
|
|
|
|
|
_prev_spectrum = np.tile(0.01, config.N_PIXELS // 2)
|
|
|
|
|
|
def visualize_spectrum(y):
|
|
"""Effect that maps the Mel filterbank frequencies onto the LED strip"""
|
|
global _prev_spectrum
|
|
y = np.copy(interpolate(y, config.N_PIXELS // 2))
|
|
common_mode.update(y)
|
|
diff = y - _prev_spectrum
|
|
_prev_spectrum = np.copy(y)
|
|
# Color channel mappings
|
|
r = r_filt.update(y - common_mode.value)
|
|
g = np.abs(diff)
|
|
b = b_filt.update(np.copy(y))
|
|
# Mirror the color channels for symmetric output
|
|
r = np.concatenate((r[::-1], r))
|
|
g = np.concatenate((g[::-1], g))
|
|
b = np.concatenate((b[::-1], b))
|
|
output = np.array([r, g,b]) * 255
|
|
return output
|
|
|
|
|
|
fft_plot_filter = dsp.ExpFilter(np.tile(1e-1, config.N_FFT_BINS),
|
|
alpha_decay=0.5, alpha_rise=0.99)
|
|
mel_gain = dsp.ExpFilter(np.tile(1e-1, config.N_FFT_BINS),
|
|
alpha_decay=0.01, alpha_rise=0.99)
|
|
mel_smoothing = dsp.ExpFilter(np.tile(1e-1, config.N_FFT_BINS),
|
|
alpha_decay=0.5, alpha_rise=0.99)
|
|
volume = dsp.ExpFilter(config.MIN_VOLUME_THRESHOLD,
|
|
alpha_decay=0.02, alpha_rise=0.02)
|
|
fft_window = np.hamming(int(config.MIC_RATE / config.FPS) * config.N_ROLLING_HISTORY)
|
|
prev_fps_update = time.time()
|
|
|
|
|
|
def microphone_update(audio_samples):
|
|
global y_roll, prev_rms, prev_exp, prev_fps_update
|
|
# Normalize samples between 0 and 1
|
|
y = audio_samples / 2.0**15
|
|
# Construct a rolling window of audio samples
|
|
y_roll[:-1] = y_roll[1:]
|
|
y_roll[-1, :] = np.copy(y)
|
|
y_data = np.concatenate(y_roll, axis=0).astype(np.float32)
|
|
|
|
vol = np.max(np.abs(y_data))
|
|
if vol < config.MIN_VOLUME_THRESHOLD:
|
|
print('No audio input. Volume below threshold. Volume:', vol)
|
|
led.pixels = np.tile(0, (3, config.N_PIXELS))
|
|
led.update()
|
|
else:
|
|
# Transform audio input into the frequency domain
|
|
N = len(y_data)
|
|
N_zeros = 2**int(np.ceil(np.log2(N))) - N
|
|
# Pad with zeros until the next power of two
|
|
y_data *= fft_window
|
|
y_padded = np.pad(y_data, (0, N_zeros), mode='constant')
|
|
YS = np.abs(np.fft.rfft(y_padded)[:N // 2])
|
|
# Construct a Mel filterbank from the FFT data
|
|
mel = np.atleast_2d(YS).T * dsp.mel_y.T
|
|
# Scale data to values more suitable for visualization
|
|
# mel = np.sum(mel, axis=0)
|
|
mel = np.sum(mel, axis=0)
|
|
mel = mel**2.0
|
|
# Gain normalization
|
|
mel_gain.update(np.max(gaussian_filter1d(mel, sigma=1.0)))
|
|
mel /= mel_gain.value
|
|
mel = mel_smoothing.update(mel)
|
|
# Map filterbank output onto LED strip
|
|
output = visualization_effect(mel)
|
|
led.pixels = output
|
|
led.update()
|
|
if config.USE_GUI:
|
|
# Plot filterbank output
|
|
x = np.linspace(config.MIN_FREQUENCY, config.MAX_FREQUENCY, len(mel))
|
|
mel_curve.setData(x=x, y=fft_plot_filter.update(mel))
|
|
# Plot the color channels
|
|
r_curve.setData(y=led.pixels[0])
|
|
g_curve.setData(y=led.pixels[1])
|
|
b_curve.setData(y=led.pixels[2])
|
|
if config.USE_GUI:
|
|
app.processEvents()
|
|
|
|
if config.DISPLAY_FPS:
|
|
fps = frames_per_second()
|
|
if time.time() - 0.5 > prev_fps_update:
|
|
prev_fps_update = time.time()
|
|
print('FPS {:.0f} / {:.0f}'.format(fps, config.FPS))
|
|
|
|
|
|
# Number of audio samples to read every time frame
|
|
samples_per_frame = int(config.MIC_RATE / config.FPS)
|
|
|
|
# Array containing the rolling audio sample window
|
|
y_roll = np.random.rand(config.N_ROLLING_HISTORY, samples_per_frame) / 1e16
|
|
|
|
visualization_effect = visualize_spectrum
|
|
"""Visualization effect to display on the LED strip"""
|
|
|
|
|
|
if __name__ == '__main__':
|
|
if config.USE_GUI:
|
|
import pyqtgraph as pg
|
|
from pyqtgraph.Qt import QtGui, QtCore
|
|
# Create GUI window
|
|
app = QtGui.QApplication([])
|
|
view = pg.GraphicsView()
|
|
layout = pg.GraphicsLayout(border=(100,100,100))
|
|
view.setCentralItem(layout)
|
|
view.show()
|
|
view.setWindowTitle('Visualization')
|
|
view.resize(800,600)
|
|
# Mel filterbank plot
|
|
fft_plot = layout.addPlot(title='Filterbank Output', colspan=3)
|
|
fft_plot.setRange(yRange=[-0.1, 1.2])
|
|
fft_plot.disableAutoRange(axis=pg.ViewBox.YAxis)
|
|
x_data = np.array(range(1, config.N_FFT_BINS + 1))
|
|
mel_curve = pg.PlotCurveItem()
|
|
mel_curve.setData(x=x_data, y=x_data*0)
|
|
fft_plot.addItem(mel_curve)
|
|
# Visualization plot
|
|
layout.nextRow()
|
|
led_plot = layout.addPlot(title='Visualization Output', colspan=3)
|
|
led_plot.setRange(yRange=[-5, 260])
|
|
led_plot.disableAutoRange(axis=pg.ViewBox.YAxis)
|
|
# Pen for each of the color channel curves
|
|
r_pen = pg.mkPen((255, 30, 30, 200), width=4)
|
|
g_pen = pg.mkPen((30, 255, 30, 200), width=4)
|
|
b_pen = pg.mkPen((30, 30, 255, 200), width=4)
|
|
# Color channel curves
|
|
r_curve = pg.PlotCurveItem(pen=r_pen)
|
|
g_curve = pg.PlotCurveItem(pen=g_pen)
|
|
b_curve = pg.PlotCurveItem(pen=b_pen)
|
|
# Define x data
|
|
x_data = np.array(range(1, config.N_PIXELS + 1))
|
|
r_curve.setData(x=x_data, y=x_data*0)
|
|
g_curve.setData(x=x_data, y=x_data*0)
|
|
b_curve.setData(x=x_data, y=x_data*0)
|
|
# Add curves to plot
|
|
led_plot.addItem(r_curve)
|
|
led_plot.addItem(g_curve)
|
|
led_plot.addItem(b_curve)
|
|
# Frequency range label
|
|
freq_label = pg.LabelItem('')
|
|
# Frequency slider
|
|
def freq_slider_change(tick):
|
|
minf = freq_slider.tickValue(0)**2.0 * (config.MIC_RATE / 2.0)
|
|
maxf = freq_slider.tickValue(1)**2.0 * (config.MIC_RATE / 2.0)
|
|
t = 'Frequency range: {:.0f} - {:.0f} Hz'.format(minf, maxf)
|
|
freq_label.setText(t)
|
|
config.MIN_FREQUENCY = minf
|
|
config.MAX_FREQUENCY = maxf
|
|
dsp.create_mel_bank()
|
|
freq_slider = pg.TickSliderItem(orientation='bottom', allowAdd=False)
|
|
freq_slider.addTick((config.MIN_FREQUENCY / (config.MIC_RATE / 2.0))**0.5)
|
|
freq_slider.addTick((config.MAX_FREQUENCY / (config.MIC_RATE / 2.0))**0.5)
|
|
freq_slider.tickMoveFinished = freq_slider_change
|
|
freq_label.setText('Frequency range: {} - {} Hz'.format(
|
|
config.MIN_FREQUENCY,
|
|
config.MAX_FREQUENCY))
|
|
# Effect selection
|
|
active_color = '#16dbeb'
|
|
inactive_color = '#FFFFFF'
|
|
def energy_click(x):
|
|
global visualization_effect
|
|
visualization_effect = visualize_energy
|
|
energy_label.setText('Energy', color=active_color)
|
|
scroll_label.setText('Scroll', color=inactive_color)
|
|
spectrum_label.setText('Spectrum', color=inactive_color)
|
|
def scroll_click(x):
|
|
global visualization_effect
|
|
visualization_effect = visualize_scroll
|
|
energy_label.setText('Energy', color=inactive_color)
|
|
scroll_label.setText('Scroll', color=active_color)
|
|
spectrum_label.setText('Spectrum', color=inactive_color)
|
|
def spectrum_click(x):
|
|
global visualization_effect
|
|
visualization_effect = visualize_spectrum
|
|
energy_label.setText('Energy', color=inactive_color)
|
|
scroll_label.setText('Scroll', color=inactive_color)
|
|
spectrum_label.setText('Spectrum', color=active_color)
|
|
# Create effect "buttons" (labels with click event)
|
|
energy_label = pg.LabelItem('Energy')
|
|
scroll_label = pg.LabelItem('Scroll')
|
|
spectrum_label = pg.LabelItem('Spectrum')
|
|
energy_label.mousePressEvent = energy_click
|
|
scroll_label.mousePressEvent = scroll_click
|
|
spectrum_label.mousePressEvent = spectrum_click
|
|
energy_click(0)
|
|
# Layout
|
|
layout.nextRow()
|
|
layout.addItem(freq_label, colspan=3)
|
|
layout.nextRow()
|
|
layout.addItem(freq_slider, colspan=3)
|
|
layout.nextRow()
|
|
layout.addItem(energy_label)
|
|
layout.addItem(scroll_label)
|
|
layout.addItem(spectrum_label)
|
|
# Initialize LEDs
|
|
led.update()
|
|
# Start listening to live audio stream
|
|
microphone.start_stream(microphone_update)
|