File size: 6,898 Bytes
086b12f
476bded
a64c5cc
 
 
321613a
 
476bded
 
a64c5cc
 
321613a
 
 
a64c5cc
 
 
 
 
 
 
 
 
 
 
 
 
476bded
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a64c5cc
a5c0535
476bded
 
a5c0535
476bded
a64c5cc
476bded
a64c5cc
 
476bded
a64c5cc
476bded
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a64c5cc
476bded
 
 
 
 
 
 
 
 
 
 
 
 
 
a64c5cc
 
476bded
 
 
 
 
 
a64c5cc
476bded
a64c5cc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
321613a
 
 
 
 
 
 
a64c5cc
 
321613a
 
 
35af352
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
# from EDFlib.edfwriter import EDFwriter
from pyedflib import highlevel
from portilooplot.jupyter_plot import ProgressPlot
from pathlib import Path
import numpy as np
import csv
import time
import os
import warnings


EDF_PATH = Path.home() / 'workspace' / 'edf_recording'
# Path to the recordings
RECORDING_PATH = Path.home() / 'portiloop-software' / 'portiloop' / 'recordings'


class DummyAlsaMixer:
    def __init__(self):
        self.volume = 50
    
    def getvolume(self):
        return [self.volume]
    
    def setvolume(self, volume):
        self.volume = volume   


# class EDFRecorder:
#     def __init__(self, signal_labels, filename, frequency):
#         self.filename = filename
#         self.nb_signals = len(signal_labels)
#         self.samples_per_datarecord_array = frequency
#         self.physical_max = 5000000
#         self.physical_min = -5000000
#         self.signal_labels = signal_labels
#         self.edf_buffer = []

#     def open_recording_file(self):
#         nb_signals = self.nb_signals
#         samples_per_datarecord_array = self.samples_per_datarecord_array
#         physical_max = self.physical_max
#         physical_min = self.physical_min
#         signal_labels = self.signal_labels

#         print(f"Will store edf recording in {self.filename}")

#         self.edf_writer = EDFwriter(p_path=str(self.filename),
#                                     f_file_type=EDFwriter.EDFLIB_FILETYPE_EDFPLUS,
#                                     number_of_signals=nb_signals)
        
#         for signal in range(nb_signals):
#             assert self.edf_writer.setSampleFrequency(signal, samples_per_datarecord_array) == 0
#             assert self.edf_writer.setPhysicalMaximum(signal, physical_max) == 0
#             assert self.edf_writer.setPhysicalMinimum(signal, physical_min) == 0
#             assert self.edf_writer.setDigitalMaximum(signal, 32767) == 0
#             assert self.edf_writer.setDigitalMinimum(signal, -32768) == 0
#             assert self.edf_writer.setSignalLabel(signal, signal_labels[signal]) == 0
#             assert self.edf_writer.setPhysicalDimension(signal, 'uV') == 0

#     def close_recording_file(self):
#         assert self.edf_writer.close() == 0
    
#     def add_recording_data(self, data):
#         self.edf_buffer += data
#         if len(self.edf_buffer) >= self.samples_per_datarecord_array:
#             datarecord_array = self.edf_buffer[:self.samples_per_datarecord_array]
#             self.edf_buffer = self.edf_buffer[self.samples_per_datarecord_array:]
#             datarecord_array = np.array(datarecord_array).transpose()
#             assert len(datarecord_array) == self.nb_signals, f"len(data)={len(data)}!={self.nb_signals}"
#             for d in datarecord_array:
#                 assert len(d) == self.samples_per_datarecord_array, f"{len(d)}!={self.samples_per_datarecord_array}"
#                 assert self.edf_writer.writeSamples(d) == 0

class EDFRecorder:
    def __init__(self, signal_labels, filename, frequency):
        self.writing_buffer = []
        self.max_write = 1000
        self.filename = filename
        self.csv_filename = str(filename).split('.')[0] + '.csv'
        self.signal_labels = signal_labels
        self.frequency = frequency

    def open_recording_file(self):
        self.file = open(self.csv_filename, 'w')

    def close_recording_file(self):
        self.file.close()
        data = np.genfromtxt(self.csv_filename, delimiter=',')
        # Convert to float values
        data = data.astype(np.float32)
        data = data.transpose()
        assert data.shape[0] == len(self.signal_labels), f"{data.shape[0]}!={len(self.signal_labels)}"
        signal_headers = []
        for row_i in range(data.shape[0]):
            # If we only have zeros in that row, the channel was not activated so we must set the physical max and min manually
            if np.all(data[row_i] == 0):
                phys_max = 200
                phys_min = -200
            else:
                phys_max = np.amax(data[row_i])
                phys_min = np.amin(data[row_i])

            # Create the signal header
            signal_headers.append(highlevel.make_signal_header(
                self.signal_labels[row_i], 
                sample_frequency=self.frequency,
                physical_max=phys_max,
                physical_min=phys_min,))
        self.filename = str(self.filename)
        print(f"Saving to {self.filename}")

        with warnings.catch_warnings():
            warnings.simplefilter("ignore")
            highlevel.write_edf(str(self.filename), data, signal_headers)

        os.remove(self.csv_filename)

    def add_recording_data(self, data):
        self.writing_buffer += data
        # write to file
        if len(self.writing_buffer) >= self.max_write:
            for point in self.writing_buffer:
                self.file.write(','.join([str(elt) for elt in point]) + '\n')
            self.writing_buffer = []

        

class LiveDisplay():
    def __init__(self, channel_names, window_len=100):
        self.datapoint_dim = len(channel_names)
        self.history = []
        self.pp = ProgressPlot(plot_names=channel_names, max_window_len=window_len)
        self.matplotlib = False

    def add_datapoints(self, datapoints):
        """
        Adds 8 lists of datapoints to the plot
        
        Args:
            datapoints: list of 8 lists of floats (or list of 8 floats)
        """
        if self.matplotlib:
            import matplotlib.pyplot as plt
        disp_list = []
        for datapoint in datapoints:
            d = [[elt] for elt in datapoint]
            disp_list.append(d)
            
            if self.matplotlib:
                self.history += d[1]
        
        if not self.matplotlib:
            self.pp.update_with_datapoints(disp_list)
        elif len(self.history) == 1000:
            plt.plot(self.history)
            plt.show()
            self.history = []
    
    def add_datapoint(self, datapoint):
        disp_list = [[elt] for elt in datapoint]
        self.pp.update(disp_list)


class FileReader:
    def __init__(self, filename):
        file = open(filename, 'r')
        # Open a csv file
        print(f"Reading from file {filename}")
        self.csv_reader = csv.reader(file, delimiter=',')
        self.wait_time = 1/250.0
        self.index = -1
        self.last_time = time.time()

    def get_point(self):
        """
        Returns the next point in the file
        """
        try:
            point = next(self.csv_reader)
            self.index += 1
            while time.time() - self.last_time < self.wait_time:
                continue
            self.last_time = time.time()
            return self.index, float(point[0]), float(point[1]), point[2] == '1', point[3] == '1'
        except StopIteration:
            return None