sc7-default-firmware/waveform.py

133 lines
5.8 KiB
Python
Raw Normal View History

2023-11-03 09:47:33 -07:00
import dma_defs
import pin_defs
import codec
import math
import struct
import time
import array
import rp2
import machine
from uctypes import addressof
## Set up sample memory arrays
_DEBUG = const(False)
class Waveform():
def __init__(self, num_samples_per_frame=256):
self.outBuffer_ready = False ## flag raised when a sample gets stored and the outBuffers can be written to
## your code needs to clear this from outside, then wait for next True to sync up
## at 30 kHz it's about every 38 ms.
self.num_samples = num_samples_per_frame
self.outBufferX = bytearray(self.num_samples * 2) ## 2 bytes, 1 channel
self.outBufferY = bytearray(self.num_samples * 2)
self.outBuffer = bytearray(self.num_samples * 4) ## interleave
self.outBuffer_addr = array.array("L", [addressof(self.outBuffer)])
## This DMA takes the whole block of samples and feeds them off to
## the codec's write PIO state machine
## It fires off an IRQ (feed_dac_irq_handler) when it's done with a round
## feed_dac_irq_handler interleaves the two sample channels and queues them up
self.feed_dac_transfer = rp2.DMA()
## And this DMA is chained to the above, resets it back to the beginning
self.feed_dac_control = rp2.DMA()
## Control defs:
## {'inc_read': 0, 'high_pri': 0, 'ring_sel': 0, 'size': 0,
## 'enable': 0, 'treq_sel': 0, 'sniff_en': 0, 'chain_to': 0,
## 'inc_write': 0, 'ring_size': 0, 'bswap': 0, 'IRQ_quiet': 0}
self.feed_dac_transfer.ctrl = self.feed_dac_transfer.pack_ctrl(default = 0,
size = dma_defs.SIZE_4BYTES,
enable = 1,
treq_sel = dma_defs.DREQ_PIO0_TX1,
chain_to = self.feed_dac_control.channel_id,
bswap = 1,
IRQ_quiet = 0,
inc_read = 1)
self.feed_dac_transfer.config(count = self.num_samples, write = codec.WRITE_FIFO)
self.feed_dac_transfer.irq(handler=self.feed_dac_irq_handler, hard=False)
self.feed_dac_control.ctrl = self.feed_dac_control.pack_ctrl(default = 0,
size = dma_defs.SIZE_4BYTES, ## addresses
enable = 1,
chain_to = self.feed_dac_control.channel_id, ## no chain
treq_sel = dma_defs.TREQ_PERMANENT, ## always on
IRQ_quiet = 1)
self.feed_dac_control.config(count = 1,
write = self.feed_dac_transfer.registers[15:],
read = addressof(self.outBuffer_addr),
trigger = True)
if _DEBUG:
self.debug_pin = machine.Pin(28, machine.Pin.OUT, value=0)
def init(self):
self.__init__()
def deinit(self):
"""unwind all of the above"""
print("(@_@) TODO: verify DMA shuts down")
self.feed_dac_control.ctrl = 0
time.sleep_ms(10) ## (@_@) replace with asyncio when necessary
self.feed_dac_transfer.ctrl = 0
self.feed_dac_control.close()
time.sleep_ms(10) ## (@_@) replace with asyncio when necessary
self.feed_dac_transfer.close()
## 175 us? not horrible -- roughly 6 sample frames, have 256
@micropython.viper
def interleave_buffers(self):
bufX_p = ptr16(self.outBufferX)
bufY_p = ptr16(self.outBufferY)
out_buffer_p = ptr32(self.outBuffer)
num_samples = int(self.num_samples)
for i in range(num_samples):
out_buffer_p[i] = (bufY_p[i] << 16) + bufX_p[i]
@micropython.viper
def feed_dac_irq_handler(self, calling_dma):
## this is where the magic happens
## roughly every 8.5 ms @ 256 samples
if _DEBUG:
self.debug_pin.high()
self.interleave_buffers()
self.outBuffer_ready = True
## Ping the user to load up their data
if _DEBUG:
self.debug_pin.low()
## Get data into the buffers
@micropython.viper
def _constant(self, value:int, buffer):
buffer_p = ptr8(buffer)
num_samples = int(self.num_samples)
for i in range(num_samples):
buffer_p[2*i+0] = (value & 0xFF00) >> 8
buffer_p[2*i+1] = value & 0x00FF
@micropython.viper
def _pack_wave(self, wavelist, buffer):
"""takes a list of integers, +/- 15 bits worth, and packs it into a buffer"""
buffer_p = ptr8(buffer)
num_samples = int(self.num_samples)
for i in range(num_samples):
sample = int(wavelist[i])
buffer_p[2*i+0] = (sample & 0xFF00) >> 8
buffer_p[2*i+1] = sample & 0x00FF
## Convenience functions
def packX(self, wavelist):
self._pack_wave(wavelist, self.outBufferX)
def packY(self, wavelist):
self._pack_wave(wavelist, self.outBufferY)
def constantX(self, value:int):
self._constant(value, self.outBufferX)
def constantY(self, value:int):
self._constant(value, self.outBufferY)
def point(self, x:int, y:int):
self._constant(x, self.outBufferX)
self._constant(y, self.outBufferY)