Spaces:
Sleeping
Sleeping
File size: 6,898 Bytes
086b12f 476bded a64c5cc 321613a 476bded a64c5cc 321613a a64c5cc 476bded a64c5cc a5c0535 476bded a5c0535 476bded a64c5cc 476bded a64c5cc 476bded a64c5cc 476bded a64c5cc 476bded a64c5cc 476bded a64c5cc 476bded a64c5cc 321613a a64c5cc 321613a 35af352 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 |
# from EDFlib.edfwriter import EDFwriter
from pyedflib import highlevel
from portilooplot.jupyter_plot import ProgressPlot
from pathlib import Path
import numpy as np
import csv
import time
import os
import warnings
EDF_PATH = Path.home() / 'workspace' / 'edf_recording'
# Path to the recordings
RECORDING_PATH = Path.home() / 'portiloop-software' / 'portiloop' / 'recordings'
class DummyAlsaMixer:
def __init__(self):
self.volume = 50
def getvolume(self):
return [self.volume]
def setvolume(self, volume):
self.volume = volume
# class EDFRecorder:
# def __init__(self, signal_labels, filename, frequency):
# self.filename = filename
# self.nb_signals = len(signal_labels)
# self.samples_per_datarecord_array = frequency
# self.physical_max = 5000000
# self.physical_min = -5000000
# self.signal_labels = signal_labels
# self.edf_buffer = []
# def open_recording_file(self):
# nb_signals = self.nb_signals
# samples_per_datarecord_array = self.samples_per_datarecord_array
# physical_max = self.physical_max
# physical_min = self.physical_min
# signal_labels = self.signal_labels
# print(f"Will store edf recording in {self.filename}")
# self.edf_writer = EDFwriter(p_path=str(self.filename),
# f_file_type=EDFwriter.EDFLIB_FILETYPE_EDFPLUS,
# number_of_signals=nb_signals)
# for signal in range(nb_signals):
# assert self.edf_writer.setSampleFrequency(signal, samples_per_datarecord_array) == 0
# assert self.edf_writer.setPhysicalMaximum(signal, physical_max) == 0
# assert self.edf_writer.setPhysicalMinimum(signal, physical_min) == 0
# assert self.edf_writer.setDigitalMaximum(signal, 32767) == 0
# assert self.edf_writer.setDigitalMinimum(signal, -32768) == 0
# assert self.edf_writer.setSignalLabel(signal, signal_labels[signal]) == 0
# assert self.edf_writer.setPhysicalDimension(signal, 'uV') == 0
# def close_recording_file(self):
# assert self.edf_writer.close() == 0
# def add_recording_data(self, data):
# self.edf_buffer += data
# if len(self.edf_buffer) >= self.samples_per_datarecord_array:
# datarecord_array = self.edf_buffer[:self.samples_per_datarecord_array]
# self.edf_buffer = self.edf_buffer[self.samples_per_datarecord_array:]
# datarecord_array = np.array(datarecord_array).transpose()
# assert len(datarecord_array) == self.nb_signals, f"len(data)={len(data)}!={self.nb_signals}"
# for d in datarecord_array:
# assert len(d) == self.samples_per_datarecord_array, f"{len(d)}!={self.samples_per_datarecord_array}"
# assert self.edf_writer.writeSamples(d) == 0
class EDFRecorder:
def __init__(self, signal_labels, filename, frequency):
self.writing_buffer = []
self.max_write = 1000
self.filename = filename
self.csv_filename = str(filename).split('.')[0] + '.csv'
self.signal_labels = signal_labels
self.frequency = frequency
def open_recording_file(self):
self.file = open(self.csv_filename, 'w')
def close_recording_file(self):
self.file.close()
data = np.genfromtxt(self.csv_filename, delimiter=',')
# Convert to float values
data = data.astype(np.float32)
data = data.transpose()
assert data.shape[0] == len(self.signal_labels), f"{data.shape[0]}!={len(self.signal_labels)}"
signal_headers = []
for row_i in range(data.shape[0]):
# If we only have zeros in that row, the channel was not activated so we must set the physical max and min manually
if np.all(data[row_i] == 0):
phys_max = 200
phys_min = -200
else:
phys_max = np.amax(data[row_i])
phys_min = np.amin(data[row_i])
# Create the signal header
signal_headers.append(highlevel.make_signal_header(
self.signal_labels[row_i],
sample_frequency=self.frequency,
physical_max=phys_max,
physical_min=phys_min,))
self.filename = str(self.filename)
print(f"Saving to {self.filename}")
with warnings.catch_warnings():
warnings.simplefilter("ignore")
highlevel.write_edf(str(self.filename), data, signal_headers)
os.remove(self.csv_filename)
def add_recording_data(self, data):
self.writing_buffer += data
# write to file
if len(self.writing_buffer) >= self.max_write:
for point in self.writing_buffer:
self.file.write(','.join([str(elt) for elt in point]) + '\n')
self.writing_buffer = []
class LiveDisplay():
def __init__(self, channel_names, window_len=100):
self.datapoint_dim = len(channel_names)
self.history = []
self.pp = ProgressPlot(plot_names=channel_names, max_window_len=window_len)
self.matplotlib = False
def add_datapoints(self, datapoints):
"""
Adds 8 lists of datapoints to the plot
Args:
datapoints: list of 8 lists of floats (or list of 8 floats)
"""
if self.matplotlib:
import matplotlib.pyplot as plt
disp_list = []
for datapoint in datapoints:
d = [[elt] for elt in datapoint]
disp_list.append(d)
if self.matplotlib:
self.history += d[1]
if not self.matplotlib:
self.pp.update_with_datapoints(disp_list)
elif len(self.history) == 1000:
plt.plot(self.history)
plt.show()
self.history = []
def add_datapoint(self, datapoint):
disp_list = [[elt] for elt in datapoint]
self.pp.update(disp_list)
class FileReader:
def __init__(self, filename):
file = open(filename, 'r')
# Open a csv file
print(f"Reading from file {filename}")
self.csv_reader = csv.reader(file, delimiter=',')
self.wait_time = 1/250.0
self.index = -1
self.last_time = time.time()
def get_point(self):
"""
Returns the next point in the file
"""
try:
point = next(self.csv_reader)
self.index += 1
while time.time() - self.last_time < self.wait_time:
continue
self.last_time = time.time()
return self.index, float(point[0]), float(point[1]), point[2] == '1', point[3] == '1'
except StopIteration:
return None |