from time import sleep import time from playsound import playsound import numpy as np import matplotlib.pyplot as plt import os from pathlib import Path from datetime import datetime, timedelta import multiprocessing as mp from queue import Empty import warnings import shutil from pyedflib import highlevel from datetime import datetime import matplotlib.pyplot as plt from portilooplot.jupyter_plot import ProgressPlot from portiloop.hardware.frontend import Frontend from portiloop.hardware.leds import LEDs, Color from IPython.display import clear_output, display import ipywidgets as widgets DEFAULT_FRONTEND_CONFIG = [ # nomenclature: name [default setting] [bits 7-0] : description # Read only ID: 0x3E, # ID [xx] [REV_ID[2:0], 1, DEV_ID[1:0], NU_CH[1:0]] : (RO) # Global Settings Across Channels: 0x96, # CONFIG1 [96] [1, DAISY_EN(bar), CLK_EN, 1, 0, DR[2:0]] : Datarate = 250 SPS 0xC0, # CONFIG2 [C0] [1, 1, 0, INT_CAL, 0, CAL_AMP0, CAL_FREQ[1:0]] : No tests 0x60, # CONFIG3 [60] [PD_REFBUF(bar), 1, 1, BIAS_MEAS, BIASREF_INT, PD_BIAS(bar), BIAS_LOFF_SENS, BIAS_STAT] : Power-down reference buffer, no bias 0x00, # LOFF [00] [COMP_TH[2:0], 0, ILEAD_OFF[1:0], FLEAD_OFF[1:0]] : No lead-off # Channel-Specific Settings: 0x61, # CH1SET [61] [PD1, GAIN1[2:0], SRB2, MUX1[2:0]] : Channel 1 active, 24 gain, no SRB2 & input shorted 0x61, # CH2SET [61] [PD2, GAIN2[2:0], SRB2, MUX2[2:0]] : Channel 2 active, 24 gain, no SRB2 & input shorted 0x61, # CH3SET [61] [PD3, GAIN3[2:0], SRB2, MUX3[2:0]] : Channel 3 active, 24 gain, no SRB2 & input shorted 0x61, # CH4SET [61] [PD4, GAIN4[2:0], SRB2, MUX4[2:0]] : Channel 4 active, 24 gain, no SRB2 & input shorted 0x61, # CH5SET [61] [PD5, GAIN5[2:0], SRB2, MUX5[2:0]] : Channel 5 active, 24 gain, no SRB2 & input shorted 0x61, # CH6SET [61] [PD6, GAIN6[2:0], SRB2, MUX6[2:0]] : Channel 6 active, 24 gain, no SRB2 & input shorted 0x61, # CH7SET [61] [PD7, GAIN7[2:0], SRB2, MUX7[2:0]] : Channel 7 active, 24 gain, no SRB2 & input shorted 0x61, # CH8SET [61] [PD8, GAIN8[2:0], SRB2, MUX8[2:0]] : Channel 8 active, 24 gain, no SRB2 & input shorted 0x00, # BIAS_SENSP [00] [BIASP8, BIASP7, BIASP6, BIASP5, BIASP4, BIASP3, BIASP2, BIASP1] : No bias 0x00, # BIAS_SENSN [00] [BIASN8, BIASN7, BIASN6, BIASN5, BIASN4, BIASN3, BIASN2, BIASN1] No bias 0x00, # LOFF_SENSP [00] [LOFFP8, LOFFP7, LOFFP6, LOFFP5, LOFFP4, LOFFP3, LOFFP2, LOFFP1] : No lead-off 0x00, # LOFF_SENSN [00] [LOFFM8, LOFFM7, LOFFM6, LOFFM5, LOFFM4, LOFFM3, LOFFM2, LOFFM1] : No lead-off 0x00, # LOFF_FLIP [00] [LOFF_FLIP8, LOFF_FLIP7, LOFF_FLIP6, LOFF_FLIP5, LOFF_FLIP4, LOFF_FLIP3, LOFF_FLIP2, LOFF_FLIP1] : No lead-off flip # Lead-Off Status Registers (Read-Only Registers): 0x00, # LOFF_STATP [00] [IN8P_OFF, IN7P_OFF, IN6P_OFF, IN5P_OFF, IN4P_OFF, IN3P_OFF, IN2P_OFF, IN1P_OFF] : Lead-off positive status (RO) 0x00, # LOFF_STATN [00] [IN8M_OFF, IN7M_OFF, IN6M_OFF, IN5M_OFF, IN4M_OFF, IN3M_OFF, IN2M_OFF, IN1M_OFF] : Laed-off negative status (RO) # GPIO and OTHER Registers: 0x0F, # GPIO [0F] [GPIOD[4:1], GPIOC[4:1]] : All GPIOs as inputs 0x00, # MISC1 [00] [0, 0, SRB1, 0, 0, 0, 0, 0] : Disable SRBM 0x00, # MISC2 [00] [00] : Unused 0x00, # CONFIG4 [00] [0, 0, 0, 0, SINGLE_SHOT, 0, PD_LOFF_COMP(bar), 0] : Single-shot, lead-off comparator disabled ] FRONTEND_CONFIG = [ 0x3E, # ID (RO) 0x95, # CONFIG1 [95] [1, DAISY_EN(bar), CLK_EN, 1, 0, DR[2:0]] : Datarate = 500 SPS 0xD0, # CONFIG2 [C0] [1, 1, 0, INT_CAL, 0, CAL_AMP0, CAL_FREQ[1:0]] 0xE0, # CONFIG3 [E0] [PD_REFBUF(bar), 1, 1, BIAS_MEAS, BIASREF_INT, PD_BIAS(bar), BIAS_LOFF_SENS, BIAS_STAT] : Power-down reference buffer, no bias 0x00, # No lead-off 0x03, # CH1SET [60] [PD1, GAIN1[2:0], SRB2, MUX1[2:0]] 0x00, # CH2SET 0x00, # CH3SET 0x00, # CH4SET 0x00, # CH5SET voltage 0x00, # CH6SET voltage 0x00, # CH7SET test 0x04, # CH8SET temperature 0x00, # BIAS_SENSP 0x00, # BIAS_SENSN 0xFF, # LOFF_SENSP Lead-off on all positive pins? 0xFF, # LOFF_SENSN Lead-off on all negative pins? 0x00, # Normal lead-off 0x00, # Lead-off positive status (RO) 0x00, # Lead-off negative status (RO) 0x00, # All GPIOs as output ? 0x20, # Enable SRB1 ] def mod_config(config, datarate): possible_datarates = [(250, 0x06), (500, 0x05), (1000, 0x04), (2000, 0x03), (4000, 0x02), (8000, 0x01), (16000, 0x00)] mod_dr = 0x00 for i, j in possible_datarates: if i >= datarate: mod_dr = j break new_cf1 = config[1] & 0xF8 new_cf1 = new_cf1 | j config[1] = new_cf1 print(f"DEBUG: new cf1: {hex(config[1])}") return config def filter_24(value): return (value * 4.5) / (2**23 - 1) # 23 because 1 bit is lost for sign def filter_2scomplement_np(value): v = np.where((value & (1 << 23)) != 0, value - (1 << 24), value) return filter_24(v) class LiveDisplay(): def __init__(self, datapoint_dim=8, window_len=100): self.datapoint_dim = datapoint_dim self.queue = mp.Queue() channel_names = [f"channel#{i+1}" for i in range(datapoint_dim)] channel_names[0] = "voltage" channel_names[7] = "temperature" self.pp = ProgressPlot(plot_names=channel_names, max_window_len=window_len) def add_datapoints(self, datapoints): """ Adds 8 lists of datapoints to the plot Args: datapoints: list of 8 lists of floats (or list of 8 floats) """ disp_list = [] for datapoint in datapoints: d = [[elt] for elt in datapoint] disp_list.append(d) self.pp.update_with_datapoints(disp_list) def add_datapoint(self, datapoint): disp_list = [[elt] for elt in datapoint] self.pp.update(disp_list) def _capture_process(q_data, q_out, q_in, duration, frequency, python_clock=True): """ Args: q_data: multiprocessing.Queue: captured datapoints are put in the queue q_out: mutliprocessing.Queue: to pass messages to the parent process 'STOP': end of the the process q_in: mutliprocessing.Queue: to pass messages from the parent process 'STOP': stops the process """ if duration <= 0: duration = np.inf sample_time = 1 / frequency frontend = Frontend() leds = LEDs() leds.led2(Color.PURPLE) leds.aquisition(True) try: data = frontend.read_regs(0x00, 1) assert data == [0x3E], "The communication with the ADS cannot be established." leds.led2(Color.BLUE) config = FRONTEND_CONFIG if python_clock: # set ADS to 2 * frequency config = mod_config(config, 2 * frequency) else: # set ADS to frequency config = mod_config(config, frequency) frontend.write_regs(0x00, config) data = frontend.read_regs(0x00, len(config)) assert data == config, f"Wrong config: {data} vs {config}" frontend.start() leds.led2(Color.PURPLE) while not frontend.is_ready(): pass # Set up of leds leds.aquisition(True) sleep(0.5) leds.aquisition(False) sleep(0.5) leds.aquisition(True) c = True it = 0 t_start = time.time() t_max = t_start + duration t = t_start # first sample: reading = frontend.read() datapoint = reading.channels() q_data.put(datapoint) t_next = t + sample_time # sampling loop: while c and t < t_max: t = time.time() if python_clock: if t <= t_next: time.sleep(t_next - t) t_next += sample_time reading = frontend.read() else: reading = frontend.wait_new_data() datapoint = reading.channels() q_data.put(datapoint) # Check for messages # this takes too long :/ # try: # message = q_in.get_nowait() # if message == 'STOP': # c = False # except Empty: # pass it += 1 t = time.time() tot = (t - t_start) / it print(f"Average frequency: {1 / tot} Hz for {it} samples") leds.aquisition(False) finally: frontend.close() leds.close() q_in.close() q_out.put('STOP') class Capture: def __init__(self): self.filename = Path.home() / 'edf_recording' / f"recording_{now.strftime('%m_%d_%Y_%H_%M_%S')}.edf" self._p_capture = None self.__capture_on = False self.frequency = 250 self.duration = 10 self.record = False self.display = False self.recording_file = None self.python_clock = True self.binfile = None self.temp_path = Path.home() / '.temp' # widgets self.b_capture = widgets.ToggleButtons( options=['Stop', 'Start'], description='Capture:', disabled=False, button_style='', # 'success', 'info', 'warning', 'danger' or '' tooltips=['Stop capture', 'Start capture'], # icons=['check'] * 2 ) self.b_clock = widgets.ToggleButtons( options=['Coral', 'ADS'], description='Clock:', disabled=False, button_style='', # 'success', 'info', 'warning', 'danger' or '' tooltips=['Use Coral clock (very precise, not very timely)', 'Use ADS clock (not very precise, very timely)'], # icons=['check'] * 2 ) self.b_filename = widgets.Text( value=self.filename, description='Filename:', placeholder='All files will be in the edf_recording folder' disabled=False ) self.b_frequency = widgets.IntText( value=250, description='Freq (Hz):', disabled=False ) self.b_duration = widgets.IntText( value=10, description='Time (s):', disabled=False ) self.b_record = widgets.Checkbox( value=False, description='Record', disabled=False, indent=False ) self.b_display = widgets.Checkbox( value=False, description='Display', disabled=False, indent=False ) self.b_capture.observe(self.on_b_capture, 'value') self.b_clock.observe(self.on_b_clock, 'value') self.b_frequency.observe(self.on_b_frequency, 'value') self.b_duration.observe(self.on_b_duration, 'value') self.b_record.observe(self.on_b_record, 'value') self.b_display.observe(self.on_b_display, 'value') self.b_filename.observe(self.on_b_filename, 'value') self.display_buttons() def __del__(self): self.b_capture.close() def display_buttons(self): display(widgets.VBox([self.b_frequency, self.b_duration, widgets.HBox([self.b_record, self.b_display]), self.b_clock, self.b_capture])) def on_b_capture(self, value): val = value['new'] if val == 'Start': self.start_capture( record=self.record, viz=self.display, width=500, python_clock=self.python_clock) elif val == 'Stop': clear_output() self.display_buttons() else: print(f"This option is not supported: {val}.") def on_b_clock(self, value): val = value['new'] if val == 'Coral': self.python_clock = True elif val == 'ADS': self.python_clock = False else: print(f"This option is not supported: {val}.") def on_b_frequency(self, value): val = value['new'] if val > 0: self.frequency = val else: print(f"Unsupported frequency: {val} Hz") def on_b_filename(self, value): val = value['new'] if val != '': self.filename = Path.home() / 'edf_recording' / val else: now = datetime.now() self.filename = Path.home() / 'edf_recording' / f"recording_{now.strftime('%m_%d_%Y_%H_%M_%S')}.edf" def on_b_duration(self, value): val = value['new'] if val > 0: self.duration = val else: print(f"Unsupported duration: {val} s") def on_b_record(self, value): val = value['new'] self.record = val def on_b_display(self, value): val = value['new'] self.display = val def open_recording_file(self): print(f"Will store edf recording in {self.filename}") os.mkdir(self.temp_path) self.binfile = open(self.temp_path / 'data.bin', 'wb') def close_recording_file(self): print('Saving recording data...') # Channel names channels = ['Voltage', 'Ch2', 'Ch3', 'Ch4', 'Ch5', 'Ch6', 'Ch7', 'Temperature'] # Read binary data data = np.fromfile(self.temp_path / 'data.bin', dtype=float) data = data.reshape((8, int(data.shape[0]/8))) # Declare and write EDF format file signal_headers = highlevel.make_signal_headers(channels, sample_frequency=self.frequency) header = highlevel.make_header(patientname='patient_x', gender='Female') highlevel.write_edf(self.filename, data, signal_headers, header) # Close and delete temp binary file self.binfile.close() shutil.rmtree(self.temp_path) print('...done') def add_recording_data(self, data): np.array(data).tofile(self.binfile) def start_capture(self, record=True, viz=False, width=500, python_clock=True): self.q_messages_send = mp.Queue() self.q_messages_recv = mp.Queue() self.q_data = mp.Queue() if self.__capture_on: print("Capture is already ongoing, ignoring command.") return else: self.__capture_on = True SAMPLE_TIME = 1 / frequency self._p_capture = mp.Process(target=_capture_process, args=(self.q_data, self.q_messages_recv, self.q_messages_send, self.duration, self.frequency, python_clock)) self._p_capture.start() if viz: live_disp = LiveDisplay(window_len=width) if record: self.open_recording_file() cc = True while cc: try: mess = self.q_messages_recv.get_nowait() if mess == 'STOP': cc = False except Empty: pass # retrieve all data points from q_data and put them in a list of np.array: res = [] c = True while c and len(res) < 25: try: point = self.q_data.get(timeout=SAMPLE_TIME) res.append(point) except Empty: c = False if len(res) == 0: continue n_array = np.array(res) n_array = filter_2scomplement_np(n_array) to_add = n_array.tolist() if viz: live_disp.add_datapoints(to_add) if record: self.add_recording_data(to_add) # empty q_data cc = True while cc: try: _ = self.q_data.get_nowait() except Empty: cc = False self.q_messages_recv.close() self.q_data.close() if record: self.close_recording_file() # print("DEBUG: joining capture process...") self._p_capture.join() # print("DEBUG: capture process joined.") self.__capture_on = False if __name__ == "__main__": # TODO: Argparse this pass