MiloSobral's picture
Adding support for Portiloop V2
086b12f
# from EDFlib.edfwriter import EDFwriter
from pyedflib import highlevel
from portilooplot.jupyter_plot import ProgressPlot
from pathlib import Path
import numpy as np
import csv
import time
import os
import warnings
EDF_PATH = Path.home() / 'workspace' / 'edf_recording'
# Path to the recordings
RECORDING_PATH = Path.home() / 'portiloop-software' / 'portiloop' / 'recordings'
class DummyAlsaMixer:
def __init__(self):
self.volume = 50
def getvolume(self):
return [self.volume]
def setvolume(self, volume):
self.volume = volume
# class EDFRecorder:
# def __init__(self, signal_labels, filename, frequency):
# self.filename = filename
# self.nb_signals = len(signal_labels)
# self.samples_per_datarecord_array = frequency
# self.physical_max = 5000000
# self.physical_min = -5000000
# self.signal_labels = signal_labels
# self.edf_buffer = []
# def open_recording_file(self):
# nb_signals = self.nb_signals
# samples_per_datarecord_array = self.samples_per_datarecord_array
# physical_max = self.physical_max
# physical_min = self.physical_min
# signal_labels = self.signal_labels
# print(f"Will store edf recording in {self.filename}")
# self.edf_writer = EDFwriter(p_path=str(self.filename),
# f_file_type=EDFwriter.EDFLIB_FILETYPE_EDFPLUS,
# number_of_signals=nb_signals)
# for signal in range(nb_signals):
# assert self.edf_writer.setSampleFrequency(signal, samples_per_datarecord_array) == 0
# assert self.edf_writer.setPhysicalMaximum(signal, physical_max) == 0
# assert self.edf_writer.setPhysicalMinimum(signal, physical_min) == 0
# assert self.edf_writer.setDigitalMaximum(signal, 32767) == 0
# assert self.edf_writer.setDigitalMinimum(signal, -32768) == 0
# assert self.edf_writer.setSignalLabel(signal, signal_labels[signal]) == 0
# assert self.edf_writer.setPhysicalDimension(signal, 'uV') == 0
# def close_recording_file(self):
# assert self.edf_writer.close() == 0
# def add_recording_data(self, data):
# self.edf_buffer += data
# if len(self.edf_buffer) >= self.samples_per_datarecord_array:
# datarecord_array = self.edf_buffer[:self.samples_per_datarecord_array]
# self.edf_buffer = self.edf_buffer[self.samples_per_datarecord_array:]
# datarecord_array = np.array(datarecord_array).transpose()
# assert len(datarecord_array) == self.nb_signals, f"len(data)={len(data)}!={self.nb_signals}"
# for d in datarecord_array:
# assert len(d) == self.samples_per_datarecord_array, f"{len(d)}!={self.samples_per_datarecord_array}"
# assert self.edf_writer.writeSamples(d) == 0
class EDFRecorder:
def __init__(self, signal_labels, filename, frequency):
self.writing_buffer = []
self.max_write = 1000
self.filename = filename
self.csv_filename = str(filename).split('.')[0] + '.csv'
self.signal_labels = signal_labels
self.frequency = frequency
def open_recording_file(self):
self.file = open(self.csv_filename, 'w')
def close_recording_file(self):
self.file.close()
data = np.genfromtxt(self.csv_filename, delimiter=',')
# Convert to float values
data = data.astype(np.float32)
data = data.transpose()
assert data.shape[0] == len(self.signal_labels), f"{data.shape[0]}!={len(self.signal_labels)}"
signal_headers = []
for row_i in range(data.shape[0]):
# If we only have zeros in that row, the channel was not activated so we must set the physical max and min manually
if np.all(data[row_i] == 0):
phys_max = 200
phys_min = -200
else:
phys_max = np.amax(data[row_i])
phys_min = np.amin(data[row_i])
# Create the signal header
signal_headers.append(highlevel.make_signal_header(
self.signal_labels[row_i],
sample_frequency=self.frequency,
physical_max=phys_max,
physical_min=phys_min,))
self.filename = str(self.filename)
print(f"Saving to {self.filename}")
with warnings.catch_warnings():
warnings.simplefilter("ignore")
highlevel.write_edf(str(self.filename), data, signal_headers)
os.remove(self.csv_filename)
def add_recording_data(self, data):
self.writing_buffer += data
# write to file
if len(self.writing_buffer) >= self.max_write:
for point in self.writing_buffer:
self.file.write(','.join([str(elt) for elt in point]) + '\n')
self.writing_buffer = []
class LiveDisplay():
def __init__(self, channel_names, window_len=100):
self.datapoint_dim = len(channel_names)
self.history = []
self.pp = ProgressPlot(plot_names=channel_names, max_window_len=window_len)
self.matplotlib = False
def add_datapoints(self, datapoints):
"""
Adds 8 lists of datapoints to the plot
Args:
datapoints: list of 8 lists of floats (or list of 8 floats)
"""
if self.matplotlib:
import matplotlib.pyplot as plt
disp_list = []
for datapoint in datapoints:
d = [[elt] for elt in datapoint]
disp_list.append(d)
if self.matplotlib:
self.history += d[1]
if not self.matplotlib:
self.pp.update_with_datapoints(disp_list)
elif len(self.history) == 1000:
plt.plot(self.history)
plt.show()
self.history = []
def add_datapoint(self, datapoint):
disp_list = [[elt] for elt in datapoint]
self.pp.update(disp_list)
class FileReader:
def __init__(self, filename):
file = open(filename, 'r')
# Open a csv file
print(f"Reading from file {filename}")
self.csv_reader = csv.reader(file, delimiter=',')
self.wait_time = 1/250.0
self.index = -1
self.last_time = time.time()
def get_point(self):
"""
Returns the next point in the file
"""
try:
point = next(self.csv_reader)
self.index += 1
while time.time() - self.last_time < self.wait_time:
continue
self.last_time = time.time()
return self.index, float(point[0]), float(point[1]), point[2] == '1', point[3] == '1'
except StopIteration:
return None