Spaces:
Sleeping
Sleeping
from time import sleep | |
import time | |
from playsound import playsound | |
import numpy as np | |
import matplotlib.pyplot as plt | |
import os | |
from pathlib import Path | |
from datetime import datetime, timedelta | |
import multiprocessing as mp | |
from queue import Empty | |
import warnings | |
import shutil | |
from pyedflib import highlevel | |
from datetime import datetime | |
import matplotlib.pyplot as plt | |
from portilooplot.jupyter_plot import ProgressPlot | |
from portiloop.hardware.frontend import Frontend | |
from portiloop.hardware.leds import LEDs, Color | |
from IPython.display import clear_output, display | |
import ipywidgets as widgets | |
DEFAULT_FRONTEND_CONFIG = [ | |
# nomenclature: name [default setting] [bits 7-0] : description | |
# Read only ID: | |
0x3E, # ID [xx] [REV_ID[2:0], 1, DEV_ID[1:0], NU_CH[1:0]] : (RO) | |
# Global Settings Across Channels: | |
0x96, # CONFIG1 [96] [1, DAISY_EN(bar), CLK_EN, 1, 0, DR[2:0]] : Datarate = 250 SPS | |
0xC0, # CONFIG2 [C0] [1, 1, 0, INT_CAL, 0, CAL_AMP0, CAL_FREQ[1:0]] : No tests | |
0x60, # CONFIG3 [60] [PD_REFBUF(bar), 1, 1, BIAS_MEAS, BIASREF_INT, PD_BIAS(bar), BIAS_LOFF_SENS, BIAS_STAT] : Power-down reference buffer, no bias | |
0x00, # LOFF [00] [COMP_TH[2:0], 0, ILEAD_OFF[1:0], FLEAD_OFF[1:0]] : No lead-off | |
# Channel-Specific Settings: | |
0x61, # CH1SET [61] [PD1, GAIN1[2:0], SRB2, MUX1[2:0]] : Channel 1 active, 24 gain, no SRB2 & input shorted | |
0x61, # CH2SET [61] [PD2, GAIN2[2:0], SRB2, MUX2[2:0]] : Channel 2 active, 24 gain, no SRB2 & input shorted | |
0x61, # CH3SET [61] [PD3, GAIN3[2:0], SRB2, MUX3[2:0]] : Channel 3 active, 24 gain, no SRB2 & input shorted | |
0x61, # CH4SET [61] [PD4, GAIN4[2:0], SRB2, MUX4[2:0]] : Channel 4 active, 24 gain, no SRB2 & input shorted | |
0x61, # CH5SET [61] [PD5, GAIN5[2:0], SRB2, MUX5[2:0]] : Channel 5 active, 24 gain, no SRB2 & input shorted | |
0x61, # CH6SET [61] [PD6, GAIN6[2:0], SRB2, MUX6[2:0]] : Channel 6 active, 24 gain, no SRB2 & input shorted | |
0x61, # CH7SET [61] [PD7, GAIN7[2:0], SRB2, MUX7[2:0]] : Channel 7 active, 24 gain, no SRB2 & input shorted | |
0x61, # CH8SET [61] [PD8, GAIN8[2:0], SRB2, MUX8[2:0]] : Channel 8 active, 24 gain, no SRB2 & input shorted | |
0x00, # BIAS_SENSP [00] [BIASP8, BIASP7, BIASP6, BIASP5, BIASP4, BIASP3, BIASP2, BIASP1] : No bias | |
0x00, # BIAS_SENSN [00] [BIASN8, BIASN7, BIASN6, BIASN5, BIASN4, BIASN3, BIASN2, BIASN1] No bias | |
0x00, # LOFF_SENSP [00] [LOFFP8, LOFFP7, LOFFP6, LOFFP5, LOFFP4, LOFFP3, LOFFP2, LOFFP1] : No lead-off | |
0x00, # LOFF_SENSN [00] [LOFFM8, LOFFM7, LOFFM6, LOFFM5, LOFFM4, LOFFM3, LOFFM2, LOFFM1] : No lead-off | |
0x00, # LOFF_FLIP [00] [LOFF_FLIP8, LOFF_FLIP7, LOFF_FLIP6, LOFF_FLIP5, LOFF_FLIP4, LOFF_FLIP3, LOFF_FLIP2, LOFF_FLIP1] : No lead-off flip | |
# Lead-Off Status Registers (Read-Only Registers): | |
0x00, # LOFF_STATP [00] [IN8P_OFF, IN7P_OFF, IN6P_OFF, IN5P_OFF, IN4P_OFF, IN3P_OFF, IN2P_OFF, IN1P_OFF] : Lead-off positive status (RO) | |
0x00, # LOFF_STATN [00] [IN8M_OFF, IN7M_OFF, IN6M_OFF, IN5M_OFF, IN4M_OFF, IN3M_OFF, IN2M_OFF, IN1M_OFF] : Laed-off negative status (RO) | |
# GPIO and OTHER Registers: | |
0x0F, # GPIO [0F] [GPIOD[4:1], GPIOC[4:1]] : All GPIOs as inputs | |
0x00, # MISC1 [00] [0, 0, SRB1, 0, 0, 0, 0, 0] : Disable SRBM | |
0x00, # MISC2 [00] [00] : Unused | |
0x00, # CONFIG4 [00] [0, 0, 0, 0, SINGLE_SHOT, 0, PD_LOFF_COMP(bar), 0] : Single-shot, lead-off comparator disabled | |
] | |
FRONTEND_CONFIG = [ | |
0x3E, # ID (RO) | |
0x95, # CONFIG1 [95] [1, DAISY_EN(bar), CLK_EN, 1, 0, DR[2:0]] : Datarate = 500 SPS | |
0xD0, # CONFIG2 [C0] [1, 1, 0, INT_CAL, 0, CAL_AMP0, CAL_FREQ[1:0]] | |
0xE0, # CONFIG3 [E0] [PD_REFBUF(bar), 1, 1, BIAS_MEAS, BIASREF_INT, PD_BIAS(bar), BIAS_LOFF_SENS, BIAS_STAT] : Power-down reference buffer, no bias | |
0x00, # No lead-off | |
0x03, # CH1SET [60] [PD1, GAIN1[2:0], SRB2, MUX1[2:0]] | |
0x00, # CH2SET | |
0x00, # CH3SET | |
0x00, # CH4SET | |
0x00, # CH5SET voltage | |
0x00, # CH6SET voltage | |
0x00, # CH7SET test | |
0x04, # CH8SET temperature | |
0x00, # BIAS_SENSP | |
0x00, # BIAS_SENSN | |
0xFF, # LOFF_SENSP Lead-off on all positive pins? | |
0xFF, # LOFF_SENSN Lead-off on all negative pins? | |
0x00, # Normal lead-off | |
0x00, # Lead-off positive status (RO) | |
0x00, # Lead-off negative status (RO) | |
0x00, # All GPIOs as output ? | |
0x20, # Enable SRB1 | |
] | |
def mod_config(config, datarate): | |
possible_datarates = [(250, 0x06), | |
(500, 0x05), | |
(1000, 0x04), | |
(2000, 0x03), | |
(4000, 0x02), | |
(8000, 0x01), | |
(16000, 0x00)] | |
mod_dr = 0x00 | |
for i, j in possible_datarates: | |
if i >= datarate: | |
mod_dr = j | |
break | |
new_cf1 = config[1] & 0xF8 | |
new_cf1 = new_cf1 | j | |
config[1] = new_cf1 | |
print(f"DEBUG: new cf1: {hex(config[1])}") | |
return config | |
def filter_24(value): | |
return (value * 4.5) / (2**23 - 1) # 23 because 1 bit is lost for sign | |
def filter_2scomplement_np(value): | |
v = np.where((value & (1 << 23)) != 0, value - (1 << 24), value) | |
return filter_24(v) | |
class LiveDisplay(): | |
def __init__(self, datapoint_dim=8, window_len=100): | |
self.datapoint_dim = datapoint_dim | |
self.queue = mp.Queue() | |
channel_names = [f"channel#{i+1}" for i in range(datapoint_dim)] | |
channel_names[0] = "voltage" | |
channel_names[7] = "temperature" | |
self.pp = ProgressPlot(plot_names=channel_names, max_window_len=window_len) | |
def add_datapoints(self, datapoints): | |
""" | |
Adds 8 lists of datapoints to the plot | |
Args: | |
datapoints: list of 8 lists of floats (or list of 8 floats) | |
""" | |
disp_list = [] | |
for datapoint in datapoints: | |
d = [[elt] for elt in datapoint] | |
disp_list.append(d) | |
self.pp.update_with_datapoints(disp_list) | |
def add_datapoint(self, datapoint): | |
disp_list = [[elt] for elt in datapoint] | |
self.pp.update(disp_list) | |
def _capture_process(q_data, q_out, q_in, duration, frequency, python_clock=True): | |
""" | |
Args: | |
q_data: multiprocessing.Queue: captured datapoints are put in the queue | |
q_out: mutliprocessing.Queue: to pass messages to the parent process | |
'STOP': end of the the process | |
q_in: mutliprocessing.Queue: to pass messages from the parent process | |
'STOP': stops the process | |
""" | |
if duration <= 0: | |
duration = np.inf | |
sample_time = 1 / frequency | |
frontend = Frontend() | |
leds = LEDs() | |
leds.led2(Color.PURPLE) | |
leds.aquisition(True) | |
try: | |
data = frontend.read_regs(0x00, 1) | |
assert data == [0x3E], "The communication with the ADS cannot be established." | |
leds.led2(Color.BLUE) | |
config = FRONTEND_CONFIG | |
if python_clock: # set ADS to 2 * frequency | |
config = mod_config(config, 2 * frequency) | |
else: # set ADS to frequency | |
config = mod_config(config, frequency) | |
frontend.write_regs(0x00, config) | |
data = frontend.read_regs(0x00, len(config)) | |
assert data == config, f"Wrong config: {data} vs {config}" | |
frontend.start() | |
leds.led2(Color.PURPLE) | |
while not frontend.is_ready(): | |
pass | |
# Set up of leds | |
leds.aquisition(True) | |
sleep(0.5) | |
leds.aquisition(False) | |
sleep(0.5) | |
leds.aquisition(True) | |
c = True | |
it = 0 | |
t_start = time.time() | |
t_max = t_start + duration | |
t = t_start | |
# first sample: | |
reading = frontend.read() | |
datapoint = reading.channels() | |
q_data.put(datapoint) | |
t_next = t + sample_time | |
# sampling loop: | |
while c and t < t_max: | |
t = time.time() | |
if python_clock: | |
if t <= t_next: | |
time.sleep(t_next - t) | |
t_next += sample_time | |
reading = frontend.read() | |
else: | |
reading = frontend.wait_new_data() | |
datapoint = reading.channels() | |
q_data.put(datapoint) | |
# Check for messages # this takes too long :/ | |
# try: | |
# message = q_in.get_nowait() | |
# if message == 'STOP': | |
# c = False | |
# except Empty: | |
# pass | |
it += 1 | |
t = time.time() | |
tot = (t - t_start) / it | |
print(f"Average frequency: {1 / tot} Hz for {it} samples") | |
leds.aquisition(False) | |
finally: | |
frontend.close() | |
leds.close() | |
q_in.close() | |
q_out.put('STOP') | |
class Capture: | |
def __init__(self): | |
self.filename = Path.home() / 'edf_recording' / f"recording_{now.strftime('%m_%d_%Y_%H_%M_%S')}.edf" | |
self._p_capture = None | |
self.__capture_on = False | |
self.frequency = 250 | |
self.duration = 10 | |
self.record = False | |
self.display = False | |
self.recording_file = None | |
self.python_clock = True | |
self.binfile = None | |
self.temp_path = Path.home() / '.temp' | |
# widgets | |
self.b_capture = widgets.ToggleButtons( | |
options=['Stop', 'Start'], | |
description='Capture:', | |
disabled=False, | |
button_style='', # 'success', 'info', 'warning', 'danger' or '' | |
tooltips=['Stop capture', 'Start capture'], | |
# icons=['check'] * 2 | |
) | |
self.b_clock = widgets.ToggleButtons( | |
options=['Coral', 'ADS'], | |
description='Clock:', | |
disabled=False, | |
button_style='', # 'success', 'info', 'warning', 'danger' or '' | |
tooltips=['Use Coral clock (very precise, not very timely)', | |
'Use ADS clock (not very precise, very timely)'], | |
# icons=['check'] * 2 | |
) | |
self.b_filename = widgets.Text( | |
value=self.filename, | |
description='Filename:', | |
placeholder='All files will be in the edf_recording folder' | |
disabled=False | |
) | |
self.b_frequency = widgets.IntText( | |
value=250, | |
description='Freq (Hz):', | |
disabled=False | |
) | |
self.b_duration = widgets.IntText( | |
value=10, | |
description='Time (s):', | |
disabled=False | |
) | |
self.b_record = widgets.Checkbox( | |
value=False, | |
description='Record', | |
disabled=False, | |
indent=False | |
) | |
self.b_display = widgets.Checkbox( | |
value=False, | |
description='Display', | |
disabled=False, | |
indent=False | |
) | |
self.b_capture.observe(self.on_b_capture, 'value') | |
self.b_clock.observe(self.on_b_clock, 'value') | |
self.b_frequency.observe(self.on_b_frequency, 'value') | |
self.b_duration.observe(self.on_b_duration, 'value') | |
self.b_record.observe(self.on_b_record, 'value') | |
self.b_display.observe(self.on_b_display, 'value') | |
self.b_filename.observe(self.on_b_filename, 'value') | |
self.display_buttons() | |
def __del__(self): | |
self.b_capture.close() | |
def display_buttons(self): | |
display(widgets.VBox([self.b_frequency, | |
self.b_duration, | |
widgets.HBox([self.b_record, self.b_display]), | |
self.b_clock, | |
self.b_capture])) | |
def on_b_capture(self, value): | |
val = value['new'] | |
if val == 'Start': | |
self.start_capture( | |
record=self.record, | |
viz=self.display, | |
width=500, | |
python_clock=self.python_clock) | |
elif val == 'Stop': | |
clear_output() | |
self.display_buttons() | |
else: | |
print(f"This option is not supported: {val}.") | |
def on_b_clock(self, value): | |
val = value['new'] | |
if val == 'Coral': | |
self.python_clock = True | |
elif val == 'ADS': | |
self.python_clock = False | |
else: | |
print(f"This option is not supported: {val}.") | |
def on_b_frequency(self, value): | |
val = value['new'] | |
if val > 0: | |
self.frequency = val | |
else: | |
print(f"Unsupported frequency: {val} Hz") | |
def on_b_filename(self, value): | |
val = value['new'] | |
if val != '': | |
self.filename = Path.home() / 'edf_recording' / val | |
else: | |
now = datetime.now() | |
self.filename = Path.home() / 'edf_recording' / f"recording_{now.strftime('%m_%d_%Y_%H_%M_%S')}.edf" | |
def on_b_duration(self, value): | |
val = value['new'] | |
if val > 0: | |
self.duration = val | |
else: | |
print(f"Unsupported duration: {val} s") | |
def on_b_record(self, value): | |
val = value['new'] | |
self.record = val | |
def on_b_display(self, value): | |
val = value['new'] | |
self.display = val | |
def open_recording_file(self): | |
print(f"Will store edf recording in {self.filename}") | |
os.mkdir(self.temp_path) | |
self.binfile = open(self.temp_path / 'data.bin', 'wb') | |
def close_recording_file(self): | |
print('Saving recording data...') | |
# Channel names | |
channels = ['Voltage', 'Ch2', 'Ch3', 'Ch4', 'Ch5', 'Ch6', 'Ch7', 'Temperature'] | |
# Read binary data | |
data = np.fromfile(self.temp_path / 'data.bin', dtype=float) | |
data = data.reshape((8, int(data.shape[0]/8))) | |
# Declare and write EDF format file | |
signal_headers = highlevel.make_signal_headers(channels, sample_frequency=self.frequency) | |
header = highlevel.make_header(patientname='patient_x', gender='Female') | |
highlevel.write_edf(self.filename, data, signal_headers, header) | |
# Close and delete temp binary file | |
self.binfile.close() | |
shutil.rmtree(self.temp_path) | |
print('...done') | |
def add_recording_data(self, data): | |
np.array(data).tofile(self.binfile) | |
def start_capture(self, | |
record=True, | |
viz=False, | |
width=500, | |
python_clock=True): | |
self.q_messages_send = mp.Queue() | |
self.q_messages_recv = mp.Queue() | |
self.q_data = mp.Queue() | |
if self.__capture_on: | |
print("Capture is already ongoing, ignoring command.") | |
return | |
else: | |
self.__capture_on = True | |
SAMPLE_TIME = 1 / frequency | |
self._p_capture = mp.Process(target=_capture_process, args=(self.q_data, | |
self.q_messages_recv, | |
self.q_messages_send, | |
self.duration, | |
self.frequency, | |
python_clock)) | |
self._p_capture.start() | |
if viz: | |
live_disp = LiveDisplay(window_len=width) | |
if record: | |
self.open_recording_file() | |
cc = True | |
while cc: | |
try: | |
mess = self.q_messages_recv.get_nowait() | |
if mess == 'STOP': | |
cc = False | |
except Empty: | |
pass | |
# retrieve all data points from q_data and put them in a list of np.array: | |
res = [] | |
c = True | |
while c and len(res) < 25: | |
try: | |
point = self.q_data.get(timeout=SAMPLE_TIME) | |
res.append(point) | |
except Empty: | |
c = False | |
if len(res) == 0: | |
continue | |
n_array = np.array(res) | |
n_array = filter_2scomplement_np(n_array) | |
to_add = n_array.tolist() | |
if viz: | |
live_disp.add_datapoints(to_add) | |
if record: | |
self.add_recording_data(to_add) | |
# empty q_data | |
cc = True | |
while cc: | |
try: | |
_ = self.q_data.get_nowait() | |
except Empty: | |
cc = False | |
self.q_messages_recv.close() | |
self.q_data.close() | |
if record: | |
self.close_recording_file() | |
# print("DEBUG: joining capture process...") | |
self._p_capture.join() | |
# print("DEBUG: capture process joined.") | |
self.__capture_on = False | |
if __name__ == "__main__": | |
# TODO: Argparse this | |
pass |