Major refactoring and update

* Moved all module settings to a new config.py file
* Completely overhauled visualize.py and added a new radiate effect that
colours the radiative beats according the beat frequency.
* Improved some constants like the decay constant to be parametric so
that they scale to any led strip size
* Added temporal dithering to Beat.update_pixels() so that it now
supports fractional speed values. Being limited to integral values was
starting to become a problem.
* Overhauled and simplified the LED module.
* When updating pixels, the LED module no longer sends UDP packets for
pixels that have not changed. This optimization reduces the packet load
significantly and should allow for higher refresh rates.
* Renamed lookup_table.npy to gamm_table.npy to better reflect that the
table is used for gamma correction of the LED strip
This commit is contained in:
Scott Lawson 2016-10-13 22:27:45 -07:00
parent a3a986ed0b
commit 17313c254b
7 changed files with 284 additions and 150 deletions

View File

@ -54,6 +54,9 @@ void loop() {
pixels[N].G = (uint8_t)packetBuffer[i+2];
pixels[N].B = (uint8_t)packetBuffer[i+3];
}
//ledstrip.show(pixels);
}
// Always update strip to improve temporal dithering performance
ledstrip.show(pixels);
}
}

84
python/config.py Normal file
View File

@ -0,0 +1,84 @@
"""Settings for audio reactive LED strip"""
import os
N_PIXELS = 240
"""Number of pixels in the LED strip (must match ESP8266 firmware)"""
GAMMA_TABLE_PATH = os.path.join(os.path.dirname(__file__), 'gamma_table.npy')
"""Location of the gamma correction table"""
UDP_IP = '192.168.0.100'
"""IP address of the ESP8266"""
UDP_PORT = 7777
"""Port number used for socket communication between Python and ESP8266"""
MIC_RATE = 44100
"""Sampling frequency of the microphone in Hz"""
FPS = 66
"""Desired LED strip update rate in frames (updates) per second
This is the desired update rate of the LED strip. The actual refresh rate of
the LED strip may be lower if the time needed for signal processing exceeds
the per-frame recording time.
A high FPS results in low latency and smooth animations, but it also reduces
the duration of the short-time Fourier transform. This can negatively affect
low frequency (bass) response.
"""
ENERGY_THRESHOLD = 5.5
"""Energy threshold for determining whether a beat has been detected
One aspect of beat detection is comparing the current energy of a frequency
subband to the average energy of the subband over some time interval. Beats
are often associated with large spikes in energy relative to the recent
average energy.
ENERGY_THRESHOLD is the threshold used to determine if the energy spike is
sufficiently large to be considered a beat.
For example, if ENERGY_THRESHOLD = 2, then a beat is detected if the current
frequency subband energy is more than 2 times the recent average energy.
"""
VARIANCE_THRESHOLD = 10.0
"""Variance threshold for determining whether a beat has been detected
Beat detection is largely determined by the ENERGY_THRESHOLD, but we can also
require frequency bands to have a certain minimum variance over some past
time interval before a beat can be detected.
One downside to using a variance threshold is that it is an absolute threshold
which is affected by the current volume.
"""
N_SUBBANDS = 128
"""Number of frequency bins to use for beat detection
More subbands improve beat detection sensitivity but it may become more
challenging for the visualization to work for a wide range of music.
Fewer subbands reduces signal processing time at the expense of beat detection
sensitivity.
"""
N_HISTORY = int(1.2 * FPS)
"""Number of previous samples to consider when doing beat detection
Beats are detected by comparing the most recent audio recording to a collection
of previous audio recordings. This is the number of previous audio recordings
to consider when doing beat detection.
For example, setting N_HISTORY = int(1.0 * config.FPS) means that one second
of previous audio recordings will be used for beat detection.
Smaller values reduces signal processing time but values too small may reduce
beat detection accuracy. Larger values increase signal processing time and
values too large can also reduce beat detection accuracy. Roughly one second
of previous data tends to work well.
"""
GAMMA_CORRECTION = True
"""Whether to correct LED brightness for nonlinear brightness perception"""

View File

@ -7,52 +7,23 @@ matplotlib.use('TkAgg')
import matplotlib.pylab as plt
plt.style.use('lawson')
import microphone as mic
# Number of frequency bands used for beat detection
N_subbands = 64
import config
# FFT statistics for a few previous updates
N_history = int(1.0 * mic.FPS)
ys_historical_energy = np.zeros(shape=(N_subbands, N_history))
ys_beat_threshold = 6.0
ys_variance_threshold = 0.0
# def A_weighting(fs):
# """Design of an A-weighting filter.
# b, a = A_weighting(fs) designs a digital A-weighting filter for
# sampling frequency `fs`. Usage: y = scipy.signal.lfilter(b, a, x).
# Warning: `fs` should normally be higher than 20 kHz. For example,
# fs = 48000 yields a class 1-compliant filter.
# References:
# [1] IEC/CD 1672: Electroacoustics-Sound Level Meters, Nov. 1996.
# """
# # Definition of analog A-weighting filter according to IEC/CD 1672.
# f1 = 20.598997
# f2 = 107.65265
# f3 = 737.86223
# f4 = 12194.217
# A1000 = 1.9997
# NUMs = [(2 * np.pi * f4)**2 * (10**(A1000 / 20)), 0, 0, 0, 0]
# DENs = np.polymul([1, 4 * np.pi * f4, (2 * np.pi * f4)**2],
# [1, 4 * np.pi * f1, (2 * np.pi * f1)**2])
# DENs = np.polymul(np.polymul(DENs, [1, 2 * np.pi * f3]),
# [1, 2 * np.pi * f2])
# # Use the bilinear transformation to get the digital filter.
# # (Octave, MATLAB, and PyLab disagree about Fs vs 1/Fs)
# return bilinear(NUMs, DENs, fs)
_ys_historical_energy = np.zeros(shape=(config.N_SUBBANDS, config.N_HISTORY))
def beat_detect(ys):
global ys_historical_energy
global _ys_historical_energy
# Beat energy criterion
current_energy = ys * ys
mean_energy = np.mean(ys_historical_energy, axis=1)
has_beat_energy = current_energy > mean_energy * ys_beat_threshold
ys_historical_energy = np.roll(ys_historical_energy, shift=1, axis=1)
ys_historical_energy[:, 0] = current_energy
mean_energy = np.mean(_ys_historical_energy, axis=1)
has_beat_energy = current_energy > mean_energy * config.ENERGY_THRESHOLD
_ys_historical_energy = np.roll(_ys_historical_energy, shift=1, axis=1)
_ys_historical_energy[:, 0] = current_energy
# Beat variance criterion
ys_variance = np.var(ys_historical_energy, axis=1)
has_beat_variance = ys_variance > ys_variance_threshold
ys_variance = np.var(_ys_historical_energy, axis=1)
has_beat_variance = ys_variance > config.VARIANCE_THRESHOLD
# Combined energy + variance detection
has_beat = has_beat_energy * has_beat_variance
return has_beat
@ -62,10 +33,19 @@ def fft(data):
"""Returns |fft(data)|"""
yL, yR = np.split(np.abs(np.fft.fft(data)), 2)
ys = np.add(yL, yR[::-1])
xs = np.arange(mic.CHUNK / 2, dtype=float) * float(mic.RATE) / mic.CHUNK
xs = np.arange(int(config.MIC_RATE / config.FPS) / 2, dtype=float)
xs *= float(config.MIC_RATE) / int(config.MIC_RATE / config.FPS)
return xs, ys
# def fft(data):
# """Returns |fft(data)|"""
# yL, yR = np.split(np.abs(np.fft.fft(data)), 2)
# ys = np.add(yL, yR[::-1])
# xs = np.arange(mic.CHUNK / 2, dtype=float) * float(mic.RATE) / mic.CHUNK
# return xs, ys
def fft_log_partition(data, fmin=30, fmax=20000, subbands=64):
"""Returns FFT partitioned into subbands that are logarithmically spaced"""
xs, ys = fft(data)

View File

@ -2,76 +2,92 @@ from __future__ import print_function
import time
import socket
import numpy as np
import config
# Nonlinear brightness correction
lookup_table = np.load('lookup_table.npy')
N_pixels = 240
m = None
_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
_gamma = np.load('gamma_table.npy')
_prev_pixels = np.tile(0, (config.N_PIXELS, 3))
# Socket communication settings
UDP_IP = "192.168.0.100"
UDP_PORT = 7777
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
pixels = np.tile(0, (config.N_PIXELS, 3))
"""Array containing the pixel values for the LED strip"""
def set_all(R, G, B):
for i in range(N_pixels):
set_pixel(i, R, G, B)
update_pixels()
def update():
global pixels, _prev_pixels
pixels = np.clip(pixels, 0, 255)
m = ''
for i in range(config.N_PIXELS):
# Ignore pixels if they haven't changed (saves bandwidth)
if np.array_equal(pixels[i], _prev_pixels[i]):
continue
r = _gamma[pixels[i][0]] if config.GAMMA_CORRECTION else pixels[i][0]
g = _gamma[pixels[i][1]] if config.GAMMA_CORRECTION else pixels[i][1]
b = _gamma[pixels[i][2]] if config.GAMMA_CORRECTION else pixels[i][2]
m += chr(i) + chr(r) + chr(g) + chr(b)
_prev_pixels = pixels
_sock.sendto(m, (config.UDP_IP, config.UDP_PORT))
def set_from_array(x):
dt = 2.0 * np.pi / N_pixels
t = time.time() * 1.5
def r(t): return (np.sin(t + 0.0) + 1.0) * 1.0 / 2.0
def g(t): return (np.sin(t + (2.0 / 3.0) * np.pi) + 1.0) * 1.0 / 2.0
def b(t): return (np.sin(t + (4.0 / 3.0) * np.pi) + 1.0) * 1.0 / 2.0
for n in range(N_pixels):
set_pixel(N=n,
R=r(n * dt + t) * x[n],
G=g(n * dt + t) * x[n],
B=b(n * dt + t) * x[n],
nonlinear_correction=True)
update_pixels()
# def set_all(R, G, B):
# for i in range(config.N_PIXELS):
# set_pixel(i, R, G, B)
# update_pixels()
def set_pixel(N, R, G, B, nonlinear_correction=True):
global m
r = int(min(max(R, 0), 255))
g = int(min(max(G, 0), 255))
b = int(min(max(B, 0), 255))
if nonlinear_correction:
r = lookup_table[r]
g = lookup_table[g]
b = lookup_table[b]
if m is None:
m = chr(N) + chr(r) + chr(g) + chr(b)
else:
m += chr(N) + chr(r) + chr(g) + chr(b)
# def autocolor(x, speed=1.0):
# dt = 2.0 * np.pi / config.N_PIXELS
# t = time.time() * speed
# def r(t): return (np.sin(t + 0.0) + 1.0) * 1.0 / 2.0
# def g(t): return (np.sin(t + (2.0 / 3.0) * np.pi) + 1.0) * 1.0 / 2.0
# def b(t): return (np.sin(t + (4.0 / 3.0) * np.pi) + 1.0) * 1.0 / 2.0
# for n in range(config.N_PIXELS):
# set_pixel(N=n,
# R=r(n * dt + t) * x[n],
# G=g(n * dt + t) * x[n],
# B=b(n * dt + t) * x[n],
# gamma_correction=True)
# update_pixels()
def update_pixels():
global m
sock.sendto(m, (UDP_IP, UDP_PORT))
m = None
# def set_pixel(N, R, G, B, gamma_correction=True):
# global _m
# r = int(min(max(R, 0), 255))
# g = int(min(max(G, 0), 255))
# b = int(min(max(B, 0), 255))
# if gamma_correction:
# r = _gamma_table[r]
# g = _gamma_table[g]
# b = _gamma_table[b]
# if _m is None:
# _m = chr(N) + chr(r) + chr(g) + chr(b)
# else:
# _m += chr(N) + chr(r) + chr(g) + chr(b)
def rainbow(brightness=255.0, speed=1.0, fps=10):
offset = 132
dt = 2.0 * np.pi / N_pixels
def r(t): return (np.sin(t + 0.0) + 1.0) * brightness / 2.0 + offset
def g(t): return (np.sin(t + (2.0 / 3.0) * np.pi) + 1.0) * brightness / 2.0 + offset
def b(t): return (np.sin(t + (4.0 / 3.0) * np.pi) + 1.0) * brightness / 2.0 + offset
while True:
t = time.time() * speed
for n in range(N_pixels):
T = t + n * dt
set_pixel(N=n, R=r(T), G=g(T), B=b(T))
update_pixels()
time.sleep(1.0 / fps)
# def update_pixels():
# global _m
# _sock.sendto(_m, (config.UDP_IP, config.UDP_PORT))
# _m = None
# def rainbow(brightness=255.0, speed=1.0, fps=10):
# offset = 132
# dt = 2.0 * np.pi / config.N_PIXELS
# def r(t): return (np.sin(t + 0.0) + 1.0) * brightness / 2.0 + offset
# def g(t): return (np.sin(t + (2.0 / 3.0) * np.pi) + 1.0) * brightness / 2.0 + offset
# def b(t): return (np.sin(t + (4.0 / 3.0) * np.pi) + 1.0) * brightness / 2.0 + offset
# while True:
# t = time.time() * speed
# for n in range(config.N_PIXELS):
# T = t + n * dt
# set_pixel(N=n, R=r(T), G=g(T), B=b(T))
# update_pixels()
# time.sleep(1.0 / fps)
if __name__ == '__main__':
for i in range(N_pixels):
set_all(0, 0, 0)
while True:
update()
#set_all(0, 0, 0)
# rainbow(speed=0.025, fps=40, brightness=0)

View File

@ -1,17 +1,15 @@
import pyaudio
import config
RATE = 44100
FPS = 40
CHUNK = int(RATE / FPS)
CHUNK = int(config.MIC_RATE / config.FPS)
def start_stream(callback):
p = pyaudio.PyAudio()
stream = p.open(format=pyaudio.paInt16,
channels=1,
rate=RATE,
rate=config.MIC_RATE,
input=True,
frames_per_buffer=CHUNK)
frames_per_buffer=int(config.MIC_RATE / config.FPS))
while True:
callback(stream)
stream.stop_stream()

View File

@ -1,84 +1,137 @@
from __future__ import print_function
import time
import numpy as np
from scipy.ndimage.filters import gaussian_filter1d
import config
import dsp
import led
import microphone as mic
class Beat:
def __init__(self, pixels, speed, direction):
def __init__(self, pixels, speed):
self.pixels = pixels
self.speed = float(speed)
self.zeros = np.zeros(len(pixels))
self.iteration = 0
self.direction = direction
def update_pixels(self):
self.iteration += 1
self.speed = max(0.95 * self.speed, 1.0)
self.pixels = np.roll(self.pixels, int(self.speed))
self.pixels[:int(self.speed)] = 0.0
s = 1.5 * self.iteration / (led.N_pixels / 2.0)
self.pixels = gaussian_filter1d(self.pixels, s, mode='constant')
self.pixels = np.round(self.pixels, decimals=1)
# Roll the pixel values to the right
# Temporal dithering is used to support fractional speed values
roll = int(self.speed)
roll += 1 if np.random.random() < self.speed - roll else 0
self.pixels = np.roll(self.pixels, roll, axis=0)
self.pixels[:roll] *= 0.0
# Apply Gaussian blur to create a dispersion effect
# Dispersion increases in strength over time
sigma = (2. * .14 * self.iteration / (config.N_PIXELS * self.speed))**4.
self.pixels = gaussian_filter1d(self.pixels, sigma, mode='constant')
# Exponentially decay the brightness over time
# The decay helps to direct viewer's focus to newer and brighter beats
self.pixels *= np.exp(2. * np.log(.1) / (self.speed * config.N_PIXELS))
self.pixels = np.round(self.pixels, decimals=2)
self.pixels = np.clip(self.pixels, 0, 255)
def finished(self):
return (self.pixels == self.zeros).all()
return np.array_equal(self.pixels, self.pixels * 0.0)
def radiate_effect(beats, max_speed=3, max_length=24):
def rainbow(speed=1.0 / 5.0):
# Note: assumes array is N_PIXELS / 2 long
dt = np.pi / config.N_PIXELS
t = time.time() * speed
def r(t): return (np.sin(t + 0.0) + 1.0) * 1.0 / 2.0
def g(t): return (np.sin(t + (2.0 / 3.0) * np.pi) + 1.0) * 1.0 / 2.0
def b(t): return (np.sin(t + (4.0 / 3.0) * np.pi) + 1.0) * 1.0 / 2.0
x = np.tile(0.0, (config.N_PIXELS, 3))
for i in range(config.N_PIXELS):
x[i][0] = r(i * dt + t)
x[i][1] = g(i * dt + t)
x[i][2] = b(i * dt + t)
return x
def radiate(beats, beat_speed=1.0, max_length=26, min_beats=1):
N_beats = len(beats[beats == True])
# Add new beat if beats were detected
if N_beats > 0:
if N_beats > 0 and N_beats >= min_beats:
# Beat properties
beat_power = float(N_beats) / dsp.N_subbands
beat_speed = min(N_beats, max_speed)
beat_brightness = min(beat_power * 255.0, 255.0)
beat_power = float(N_beats) / config.N_SUBBANDS
beat_brightness = min(beat_power * 40.0, 255.0)
beat_brightness = max(beat_brightness, 40)
beat_length = int(np.sqrt(beat_power) * max_length)
beat_direction = not radiate_effect.previous_direction
beat_length = max(beat_length, 2)
# Beat pixels
beat_pixels = np.zeros(led.N_pixels / 2)
beat_pixels = np.zeros(config.N_PIXELS / 2)
beat_pixels[:beat_length] = beat_brightness
# Create and add the new beat
beat = Beat(beat_pixels, beat_speed, beat_direction)
radiate_effect.previous_direction = beat_direction
radiate_effect.beats = np.append(radiate_effect.beats, beat)
beat = Beat(beat_pixels, beat_speed)
radiate.beats = np.append(radiate.beats, beat)
# Pixels that will be displayed on the LED strip
pixels_L = np.zeros(led.N_pixels / 2)
pixels_R = np.zeros(led.N_pixels / 2)
for beat in radiate_effect.beats:
if beat.direction:
pixels_L += beat.pixels
else:
pixels_R += beat.pixels
beat.update_pixels()
pixels = np.zeros(config.N_PIXELS / 2)
if len(radiate.beats):
pixels += sum([b.pixels for b in radiate.beats])
for b in radiate.beats:
b.update_pixels()
# Only keep the beats that are still visible on the strip
radiate_effect.beats = [b for b in radiate_effect.beats if not b.finished()]
# Enforce value limits
pixels_L = np.clip(pixels_L, 0.0, 255.0)
pixels_R = np.clip(pixels_R, 0.0, 255.0)
# Update the LED values
led.set_from_array(np.append(pixels_L[::-1], pixels_R))
radiate.beats = [b for b in radiate.beats if not b.finished()]
pixels = np.append(pixels[::-1], pixels)
pixels = np.clip(pixels, 0.0, 255.0)
pixels = (pixels * rainbow().T).T
pixels = np.round(pixels).astype(int)
led.pixels = pixels
led.update()
def radiate2(beats, beat_speed=0.8, max_length=26, min_beats=1):
N_beats = len(beats[beats == True])
if N_beats > 0 and N_beats >= min_beats:
index_to_color = rainbow()
# Beat properties
beat_power = float(N_beats) / config.N_SUBBANDS
beat_brightness = np.round(256.0 / config.N_SUBBANDS)
beat_brightness *= np.sqrt(config.N_SUBBANDS / N_beats)
beat_length = int(np.sqrt(beat_power) * max_length)
beat_length = max(beat_length, 2)
beat_pixels = np.tile(0.0, (config.N_PIXELS / 2, 3))
for i in range(len(beats)):
if beats[i]:
beat_color = np.round(index_to_color[i] * beat_brightness)
beat_pixels[:beat_length] += beat_color
beat_pixels = np.clip(beat_pixels, 0.0, 255.0)
beat = Beat(beat_pixels, beat_speed)
radiate2.beats = np.append(radiate2.beats, beat)
# Pixels that will be displayed on the LED strip
pixels = np.zeros((config.N_PIXELS / 2, 3))
if len(radiate2.beats):
pixels += sum([b.pixels for b in radiate2.beats])
for b in radiate2.beats:
b.update_pixels()
radiate2.beats = [b for b in radiate2.beats if not b.finished()]
pixels = np.append(pixels[::-1], pixels, axis=0)
pixels = np.clip(pixels, 0.0, 255.0)
pixels = np.round(pixels).astype(int)
led.pixels = pixels
led.update()
def microphone_update(stream):
data = np.fromstring(stream.read(mic.CHUNK), dtype=np.int16) / (2.0**15)
#data = np.diff(data)
#data = np.append(data, data[-1])
xs, ys = dsp.fft_log_partition(data=data, subbands=dsp.N_subbands)
frames_per_buffer = int(config.MIC_RATE / config.FPS)
data = np.fromstring(stream.read(frames_per_buffer), dtype=np.int16)
data = data / 2.0**15
xs, ys = dsp.fft_log_partition(data=data, subbands=config.N_SUBBANDS)
beats = dsp.beat_detect(ys)
radiate_effect(beats)
radiate2(beats)
# Settings for beat detection
dsp.ys_beat_threshold = 1.8
dsp.ys_variance_threshold = 100.0
# Initial valeus for the radiate effect
radiate_effect.previous_direction = True
radiate_effect.beats = np.array([])
# Initial values for the radiate effect
radiate.beats = np.array([])
radiate2.beats = np.array([])
if __name__ == "__main__":
mic.start_stream(microphone_update)