Initial commit
Initial commit of working python and ESP8266 code.
This commit is contained in:
parent
4f5ab1556e
commit
028500f04e
59
arduino/ws2812_controller.ino/ws2812_controller.ino
Normal file
59
arduino/ws2812_controller.ino/ws2812_controller.ino
Normal file
@ -0,0 +1,59 @@
|
|||||||
|
#include <Arduino.h>
|
||||||
|
#include <ESP8266WiFi.h>
|
||||||
|
#include <WebSocketsServer.h>
|
||||||
|
#include <Hash.h>
|
||||||
|
#include <WiFiUdp.h>
|
||||||
|
#include <ws2812_i2s.h>
|
||||||
|
|
||||||
|
#define NUM_LEDS 240
|
||||||
|
#define BUFFER_LEN 1024
|
||||||
|
|
||||||
|
// Wifi and socket settings
|
||||||
|
const char* ssid = "LAWSON-LINK-2.4";
|
||||||
|
const char* password = "felixlina10";
|
||||||
|
unsigned int localPort = 7777;
|
||||||
|
char packetBuffer[BUFFER_LEN];
|
||||||
|
|
||||||
|
// LED strip
|
||||||
|
static WS2812 ledstrip;
|
||||||
|
static Pixel_t pixels[NUM_LEDS];
|
||||||
|
WiFiUDP port;
|
||||||
|
|
||||||
|
void setup() {
|
||||||
|
Serial.begin(115200);
|
||||||
|
WiFi.begin(ssid, password);
|
||||||
|
Serial.println("");
|
||||||
|
|
||||||
|
// Connect to wifi and print the IP address over serial
|
||||||
|
while (WiFi.status() != WL_CONNECTED) {
|
||||||
|
delay(500);
|
||||||
|
Serial.print(".");
|
||||||
|
}
|
||||||
|
Serial.println("");
|
||||||
|
Serial.print("Connected to ");
|
||||||
|
Serial.println(ssid);
|
||||||
|
Serial.print("IP address: ");
|
||||||
|
Serial.println(WiFi.localIP());
|
||||||
|
port.begin(localPort);
|
||||||
|
ledstrip.init(NUM_LEDS);
|
||||||
|
}
|
||||||
|
|
||||||
|
uint8_t N = 0;
|
||||||
|
|
||||||
|
void loop() {
|
||||||
|
// Read data over socket
|
||||||
|
int packetSize = port.parsePacket();
|
||||||
|
|
||||||
|
// If packets have been received, interpret the command
|
||||||
|
if (packetSize) {
|
||||||
|
int len = port.read(packetBuffer, BUFFER_LEN);
|
||||||
|
for(int i = 0; i < len; i+=4){
|
||||||
|
packetBuffer[len] = 0;
|
||||||
|
N = packetBuffer[i];
|
||||||
|
pixels[N].R = (uint8_t)packetBuffer[i+1];
|
||||||
|
pixels[N].G = (uint8_t)packetBuffer[i+2];
|
||||||
|
pixels[N].B = (uint8_t)packetBuffer[i+3];
|
||||||
|
}
|
||||||
|
ledstrip.show(pixels);
|
||||||
|
}
|
||||||
|
}
|
79
python/dsp.py
Normal file
79
python/dsp.py
Normal file
@ -0,0 +1,79 @@
|
|||||||
|
from __future__ import print_function
|
||||||
|
from __future__ import division
|
||||||
|
import numpy as np
|
||||||
|
from scipy.interpolate import interp1d
|
||||||
|
import matplotlib
|
||||||
|
matplotlib.use('TkAgg')
|
||||||
|
import matplotlib.pylab as plt
|
||||||
|
plt.style.use('lawson')
|
||||||
|
import microphone as mic
|
||||||
|
|
||||||
|
# Number of frequency bands used for beat detection
|
||||||
|
N_subbands = 64
|
||||||
|
|
||||||
|
# FFT statistics for a few previous updates
|
||||||
|
N_history = int(1.0 * mic.FPS)
|
||||||
|
ys_historical_energy = np.zeros(shape=(N_subbands, N_history))
|
||||||
|
ys_beat_threshold = 6.0
|
||||||
|
ys_variance_threshold = 0.0
|
||||||
|
|
||||||
|
# def A_weighting(fs):
|
||||||
|
# """Design of an A-weighting filter.
|
||||||
|
# b, a = A_weighting(fs) designs a digital A-weighting filter for
|
||||||
|
# sampling frequency `fs`. Usage: y = scipy.signal.lfilter(b, a, x).
|
||||||
|
# Warning: `fs` should normally be higher than 20 kHz. For example,
|
||||||
|
# fs = 48000 yields a class 1-compliant filter.
|
||||||
|
# References:
|
||||||
|
# [1] IEC/CD 1672: Electroacoustics-Sound Level Meters, Nov. 1996.
|
||||||
|
# """
|
||||||
|
# # Definition of analog A-weighting filter according to IEC/CD 1672.
|
||||||
|
# f1 = 20.598997
|
||||||
|
# f2 = 107.65265
|
||||||
|
# f3 = 737.86223
|
||||||
|
# f4 = 12194.217
|
||||||
|
# A1000 = 1.9997
|
||||||
|
# NUMs = [(2 * np.pi * f4)**2 * (10**(A1000 / 20)), 0, 0, 0, 0]
|
||||||
|
# DENs = np.polymul([1, 4 * np.pi * f4, (2 * np.pi * f4)**2],
|
||||||
|
# [1, 4 * np.pi * f1, (2 * np.pi * f1)**2])
|
||||||
|
# DENs = np.polymul(np.polymul(DENs, [1, 2 * np.pi * f3]),
|
||||||
|
# [1, 2 * np.pi * f2])
|
||||||
|
# # Use the bilinear transformation to get the digital filter.
|
||||||
|
# # (Octave, MATLAB, and PyLab disagree about Fs vs 1/Fs)
|
||||||
|
# return bilinear(NUMs, DENs, fs)
|
||||||
|
|
||||||
|
|
||||||
|
def beat_detect(ys):
|
||||||
|
global ys_historical_energy
|
||||||
|
# Beat energy criterion
|
||||||
|
current_energy = ys * ys
|
||||||
|
mean_energy = np.mean(ys_historical_energy, axis=1)
|
||||||
|
has_beat_energy = current_energy > mean_energy * ys_beat_threshold
|
||||||
|
ys_historical_energy = np.roll(ys_historical_energy, shift=1, axis=1)
|
||||||
|
ys_historical_energy[:, 0] = current_energy
|
||||||
|
# Beat variance criterion
|
||||||
|
ys_variance = np.var(ys_historical_energy, axis=1)
|
||||||
|
has_beat_variance = ys_variance > ys_variance_threshold
|
||||||
|
# Combined energy + variance detection
|
||||||
|
has_beat = has_beat_energy * has_beat_variance
|
||||||
|
return has_beat
|
||||||
|
|
||||||
|
|
||||||
|
def fft(data):
|
||||||
|
"""Returns |fft(data)|"""
|
||||||
|
yL, yR = np.split(np.abs(np.fft.fft(data)), 2)
|
||||||
|
ys = np.add(yL, yR[::-1])
|
||||||
|
xs = np.arange(mic.CHUNK / 2, dtype=float) * float(mic.RATE) / mic.CHUNK
|
||||||
|
return xs, ys
|
||||||
|
|
||||||
|
|
||||||
|
def fft_log_partition(data, fmin=30, fmax=20000, subbands=64):
|
||||||
|
"""Returns FFT partitioned into subbands that are logarithmically spaced"""
|
||||||
|
xs, ys = fft(data)
|
||||||
|
xs_log = np.logspace(np.log10(fmin), np.log10(fmax), num=subbands * 32)
|
||||||
|
f = interp1d(xs, ys)
|
||||||
|
ys_log = f(xs_log)
|
||||||
|
X, Y = [], []
|
||||||
|
for i in range(0, subbands * 32, 32):
|
||||||
|
X.append(np.mean(xs_log[i:i + 32]))
|
||||||
|
Y.append(np.mean(ys_log[i:i + 32]))
|
||||||
|
return np.array(X), np.array(Y)
|
76
python/led.py
Normal file
76
python/led.py
Normal file
@ -0,0 +1,76 @@
|
|||||||
|
from __future__ import print_function
|
||||||
|
import time
|
||||||
|
import socket
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
|
# Nonlinear brightness correction
|
||||||
|
lookup_table = np.load('lookup_table.npy')
|
||||||
|
N_pixels = 240
|
||||||
|
m = None
|
||||||
|
|
||||||
|
# Socket communication settings
|
||||||
|
UDP_IP = "192.168.0.100"
|
||||||
|
UDP_PORT = 7777
|
||||||
|
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||||
|
|
||||||
|
|
||||||
|
def set_all(R, G, B):
|
||||||
|
for i in range(N_pixels):
|
||||||
|
set_pixel(i, R, G, B)
|
||||||
|
update_pixels()
|
||||||
|
|
||||||
|
|
||||||
|
def set_from_array(x):
|
||||||
|
dt = 2.0 * np.pi / N_pixels
|
||||||
|
t = time.time() * 1.5
|
||||||
|
def r(t): return (np.sin(t + 0.0) + 1.0) * 1.0 / 2.0
|
||||||
|
def g(t): return (np.sin(t + (2.0 / 3.0) * np.pi) + 1.0) * 1.0 / 2.0
|
||||||
|
def b(t): return (np.sin(t + (4.0 / 3.0) * np.pi) + 1.0) * 1.0 / 2.0
|
||||||
|
for n in range(N_pixels):
|
||||||
|
set_pixel(N=n,
|
||||||
|
R=r(n * dt + t) * x[n],
|
||||||
|
G=g(n * dt + t) * x[n],
|
||||||
|
B=b(n * dt + t) * x[n],
|
||||||
|
nonlinear_correction=True)
|
||||||
|
update_pixels()
|
||||||
|
|
||||||
|
|
||||||
|
def set_pixel(N, R, G, B, nonlinear_correction=True):
|
||||||
|
global m
|
||||||
|
r = int(min(max(R, 0), 255))
|
||||||
|
g = int(min(max(G, 0), 255))
|
||||||
|
b = int(min(max(B, 0), 255))
|
||||||
|
if nonlinear_correction:
|
||||||
|
r = lookup_table[r]
|
||||||
|
g = lookup_table[g]
|
||||||
|
b = lookup_table[b]
|
||||||
|
if m is None:
|
||||||
|
m = chr(N) + chr(r) + chr(g) + chr(b)
|
||||||
|
else:
|
||||||
|
m += chr(N) + chr(r) + chr(g) + chr(b)
|
||||||
|
|
||||||
|
|
||||||
|
def update_pixels():
|
||||||
|
global m
|
||||||
|
sock.sendto(m, (UDP_IP, UDP_PORT))
|
||||||
|
m = None
|
||||||
|
|
||||||
|
|
||||||
|
def rainbow(brightness=255.0, speed=1.0, fps=10):
|
||||||
|
offset = 132
|
||||||
|
dt = 2.0 * np.pi / N_pixels
|
||||||
|
def r(t): return (np.sin(t + 0.0) + 1.0) * brightness / 2.0 + offset
|
||||||
|
def g(t): return (np.sin(t + (2.0 / 3.0) * np.pi) + 1.0) * brightness / 2.0 + offset
|
||||||
|
def b(t): return (np.sin(t + (4.0 / 3.0) * np.pi) + 1.0) * brightness / 2.0 + offset
|
||||||
|
while True:
|
||||||
|
t = time.time()*speed
|
||||||
|
for n in range(N_pixels):
|
||||||
|
T = t + n * dt
|
||||||
|
set_pixel(N=n, R=r(T), G=g(T), B=b(T))
|
||||||
|
update_pixels()
|
||||||
|
time.sleep(1.0 / fps)
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
for i in range(N_pixels):
|
||||||
|
set_all(0, 0, 0)
|
||||||
|
#rainbow(speed=0.025, fps=40, brightness=0)
|
19
python/microphone.py
Normal file
19
python/microphone.py
Normal file
@ -0,0 +1,19 @@
|
|||||||
|
import pyaudio
|
||||||
|
|
||||||
|
RATE = 44100
|
||||||
|
FPS = 40
|
||||||
|
CHUNK = int(RATE / FPS)
|
||||||
|
|
||||||
|
|
||||||
|
def start_stream(callback):
|
||||||
|
p = pyaudio.PyAudio()
|
||||||
|
stream = p.open(format=pyaudio.paInt16,
|
||||||
|
channels=1,
|
||||||
|
rate=RATE,
|
||||||
|
input=True,
|
||||||
|
frames_per_buffer=CHUNK)
|
||||||
|
while True:
|
||||||
|
callback(stream)
|
||||||
|
stream.stop_stream()
|
||||||
|
stream.close()
|
||||||
|
p.terminate()
|
103
python/visualize.py
Normal file
103
python/visualize.py
Normal file
@ -0,0 +1,103 @@
|
|||||||
|
from __future__ import print_function
|
||||||
|
import time
|
||||||
|
import numpy as np
|
||||||
|
from scipy.ndimage.filters import gaussian_filter1d
|
||||||
|
import dsp
|
||||||
|
import led
|
||||||
|
import microphone as mic
|
||||||
|
|
||||||
|
# Settings for beat detection
|
||||||
|
dsp.ys_beat_threshold = 1.8
|
||||||
|
dsp.ys_variance_threshold = 0.1
|
||||||
|
|
||||||
|
# List of beats currently visible on the LED strip
|
||||||
|
visible_beats = np.array([])
|
||||||
|
|
||||||
|
class Beat:
|
||||||
|
def __init__(self, pixels, speed):
|
||||||
|
self.pixels = pixels
|
||||||
|
self.speed = float(speed)
|
||||||
|
self.zeros = np.zeros(len(pixels))
|
||||||
|
self.iteration = 0
|
||||||
|
|
||||||
|
def update_pixels(self):
|
||||||
|
self.iteration += 1
|
||||||
|
self.speed = max(0.95 * self.speed, 1.0)
|
||||||
|
self.pixels = np.roll(self.pixels, int(self.speed))
|
||||||
|
self.pixels[:int(self.speed)] = 0.0
|
||||||
|
s = self.iteration / led.N_pixels
|
||||||
|
self.pixels = gaussian_filter1d(self.pixels, s, mode='constant')
|
||||||
|
self.pixels = np.round(self.pixels, decimals=1)
|
||||||
|
|
||||||
|
def finished(self):
|
||||||
|
return (self.pixels == self.zeros).all()
|
||||||
|
|
||||||
|
|
||||||
|
prev_dir = True
|
||||||
|
def shooting_beats(beats):
|
||||||
|
global visible_beats
|
||||||
|
N_beats = len(beats[beats == True])
|
||||||
|
|
||||||
|
# Settings
|
||||||
|
max_speed = 3
|
||||||
|
max_length = 24
|
||||||
|
|
||||||
|
if N_beats > 0:
|
||||||
|
# Fraction of beats that have been detected
|
||||||
|
beat_power = float(N_beats) / dsp.N_subbands
|
||||||
|
# Speed
|
||||||
|
beat_speed = min(N_beats, max_speed)
|
||||||
|
# Brightness
|
||||||
|
beat_brightness = min(beat_power * 255.0, 255.0)
|
||||||
|
# Length
|
||||||
|
beat_length = int(np.sqrt(beat_power) * max_length)
|
||||||
|
|
||||||
|
# Pixels
|
||||||
|
beat_pixels = np.zeros(led.N_pixels / 2)
|
||||||
|
beat_pixels[:beat_length] = beat_brightness
|
||||||
|
beat_pixels = gaussian_filter1d(beat_pixels, 0.5, mode='reflect')
|
||||||
|
|
||||||
|
# Create the beat
|
||||||
|
beat = Beat(pixels=beat_pixels, speed=beat_speed)
|
||||||
|
# Assign direction
|
||||||
|
# beat.is_left = np.random.random() > 0.5
|
||||||
|
global prev_dir
|
||||||
|
beat.is_left = not prev_dir
|
||||||
|
prev_dir = not prev_dir
|
||||||
|
visible_beats = np.append(visible_beats, beat)
|
||||||
|
|
||||||
|
# Clear pixels and add beats
|
||||||
|
remaining_beats = []
|
||||||
|
pixels_L = np.zeros(led.N_pixels / 2)
|
||||||
|
pixels_R = np.zeros(led.N_pixels / 2)
|
||||||
|
for i in range(len(visible_beats)):
|
||||||
|
if visible_beats[i].is_left:
|
||||||
|
pixels_L += visible_beats[i].pixels
|
||||||
|
else:
|
||||||
|
pixels_R += visible_beats[i].pixels
|
||||||
|
visible_beats[i].update_pixels()
|
||||||
|
if not visible_beats[i].finished():
|
||||||
|
remaining_beats.append(visible_beats[i])
|
||||||
|
|
||||||
|
# Enforce value limits
|
||||||
|
pixels_L = np.clip(pixels_L, 0.0, 255.0)
|
||||||
|
pixels_R = np.clip(pixels_R, 0.0, 255.0)
|
||||||
|
# Only keep the beats that are still visible on the LED strip
|
||||||
|
visible_beats = np.array(remaining_beats)
|
||||||
|
# Update the LED values
|
||||||
|
led.set_from_array(np.append(pixels_L[::-1], pixels_R))
|
||||||
|
|
||||||
|
|
||||||
|
def microphone_update(stream):
|
||||||
|
data = np.fromstring(stream.read(mic.CHUNK), dtype=np.int16) / (2.0**15)
|
||||||
|
data = np.diff(data)
|
||||||
|
data = np.append(data, data[-1])
|
||||||
|
|
||||||
|
xs, ys = dsp.fft_log_partition(data=data, subbands=dsp.N_subbands)
|
||||||
|
beats = dsp.beat_detect(ys)
|
||||||
|
# print('Beats:', len(beats[beats == True]))
|
||||||
|
shooting_beats(beats)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
mic.start_stream(microphone_update)
|
Loading…
Reference in New Issue
Block a user