Milo Sobral
Working on making things work outside of the hardware of the Portiloop
fcba0a9
raw
history blame
38.8 kB
from time import sleep
import time
import numpy as np
from copy import deepcopy
from datetime import datetime
import multiprocessing as mp
import warnings
from threading import Thread, Lock
from portiloop.src import ADS
if ADS:
import alsaaudio
from portiloop.src.hardware.frontend import Frontend
from portiloop.src.hardware.leds import LEDs, Color
from portiloop.src.stimulation import UpStateDelayer
from portiloop.src.processing import FilterPipeline, int_to_float
from portiloop.src.config import mod_config, LEADOFF_CONFIG, FRONTEND_CONFIG, to_ads_frequency
from portiloop.src.utils import FileReader, LiveDisplay, DummyAlsaMixer, EDFRecorder, EDF_PATH
from IPython.display import clear_output, display
import ipywidgets as widgets
def capture_process(p_data_o, p_msg_io, duration, frequency, python_clock, time_msg_in, channel_states):
"""
Args:
p_data_o: multiprocessing.Pipe: captured datapoints are put here
p_msg_io: mutliprocessing.Pipe: to communicate with the parent process
duration: float: max duration of the experiment in seconds
frequency: float: sampling frequency
ptyhon_clock: bool: if True, the Coral clock is used, otherwise, the ADS interrupts are used
time_msg_in: float: min time between attempts to recv incomming messages
"""
if duration <= 0:
duration = np.inf
sample_time = 1 / frequency
frontend = Frontend()
leds = LEDs()
leds.led2(Color.PURPLE)
leds.aquisition(True)
try:
data = frontend.read_regs(0x00, 1)
assert data == [0x3E], "The communication with the ADS failed, please try again."
leds.led2(Color.BLUE)
config = FRONTEND_CONFIG
if python_clock: # set ADS to 2 * frequency
datarate = 2 * frequency
else: # set ADS to frequency
datarate = frequency
config = mod_config(config, datarate, channel_states)
frontend.write_regs(0x00, config)
data = frontend.read_regs(0x00, len(config))
assert data == config, f"Wrong config: {data} vs {config}"
frontend.start()
leds.led2(Color.PURPLE)
while not frontend.is_ready():
pass
# Set up of leds
leds.aquisition(True)
sleep(0.5)
leds.aquisition(False)
sleep(0.5)
leds.aquisition(True)
c = True
it = 0
t_start = time.time()
t_max = t_start + duration
t = t_start
# first sample:
reading = frontend.read()
datapoint = reading.channels()
p_data_o.send(datapoint)
t_next = t + sample_time
t_chk_msg = t + time_msg_in
# sampling loop:
while c and t < t_max:
t = time.time()
if python_clock:
if t <= t_next:
time.sleep(t_next - t)
t_next += sample_time
reading = frontend.read()
else:
reading = frontend.wait_new_data()
datapoint = reading.channels()
p_data_o.send(datapoint)
# Check for messages
if t >= t_chk_msg:
t_chk_msg = t + time_msg_in
if p_msg_io.poll():
message = p_msg_io.recv()
if message == 'STOP':
c = False
it += 1
t = time.time()
tot = (t - t_start) / it
p_msg_io.send(("PRT", f"Average frequency: {1 / tot} Hz for {it} samples"))
finally:
leds.aquisition(False)
leds.close()
frontend.close()
p_msg_io.send('STOP')
p_msg_io.close()
p_data_o.close()
class Capture:
def __init__(self, detector_cls=None, stimulator_cls=None):
# {now.strftime('%m_%d_%Y_%H_%M_%S')}
self.filename = EDF_PATH / 'recording.edf'
self._p_capture = None
self.__capture_on = False
self.frequency = 250
self.duration = 28800
self.power_line = 60
self.polyak_mean = 0.1
self.polyak_std = 0.001
self.epsilon = 0.000001
self.custom_fir = False
self.custom_fir_order = 20
self.custom_fir_cutoff = 30
self.filter = True
self.filter_args = [True, True, True]
self.record = False
self.detect = False
self.stimulate = False
self.threshold = 0.5
self.lsl = False
self.display = False
self.signal_input = "ADS"
self.python_clock = True
self.edf_writer = None
self.edf_buffer = []
self.signal_labels = ['Common Mode', 'ch2', 'ch3', 'ch4', 'ch5', 'ch6', 'ch7', 'ch8']
self._lock_msg_out = Lock()
self._msg_out = None
self._t_capture = None
self.channel_states = ['disabled', 'disabled', 'disabled', 'disabled', 'disabled', 'disabled', 'disabled']
self.channel_detection = 2
self.spindle_detection_mode = 'Fast'
self.spindle_freq = 10
self.detector_cls = detector_cls
self.stimulator_cls = stimulator_cls
self._test_stimulus_lock = Lock()
self._test_stimulus = False
self._pause_detect_lock = Lock()
self._pause_detect = True
if ADS:
try:
mixers = alsaaudio.mixers()
if len(mixers) <= 0:
warnings.warn(f"No ALSA mixer found.")
self.mixer = DummyAlsaMixer()
elif 'PCM' in mixers:
self.mixer = alsaaudio.Mixer(control='PCM')
else:
warnings.warn(f"Could not find mixer PCM, using {mixers[0]} instead.")
self.mixer = alsaaudio.Mixer(control=mixers[0])
except ALSAAudioError as e:
warnings.warn(f"No ALSA mixer found.")
self.mixer = DummyAlsaMixer()
self.volume = self.mixer.getvolume()[0] # we will set the same volume on all channels
else:
self.mixer = DummyAlsaMixer()
self.volume = self.mixer.getvolume()[0]
# widgets ===============================
# CHANNELS ------------------------------
# self.b_radio_ch1 = widgets.RadioButtons(
# options=['disabled', 'simple'],
# value='disabled',
# disabled=True
# )
self.b_radio_ch2 = widgets.RadioButtons(
options=['disabled', 'simple'],
value='disabled',
disabled=False
)
self.b_radio_ch3 = widgets.RadioButtons(
options=['disabled', 'simple'],
value='disabled',
disabled=False
)
self.b_radio_ch4 = widgets.RadioButtons(
options=['disabled', 'simple'],
value='disabled',
disabled=False
)
self.b_radio_ch5 = widgets.RadioButtons(
options=['disabled', 'simple'],
value='disabled',
disabled=False
)
self.b_radio_ch6 = widgets.RadioButtons(
options=['disabled', 'simple'],
value='disabled',
disabled=False
)
self.b_radio_ch7 = widgets.RadioButtons(
options=['disabled', 'simple'],
value='disabled',
disabled=False
)
self.b_radio_ch8 = widgets.RadioButtons(
options=['disabled', 'simple'],
value='disabled',
disabled=False
)
self.b_channel_detect = widgets.Dropdown(
options=[('2', 2), ('3', 3), ('4', 4), ('5', 5), ('6', 6), ('7', 7), ('8', 8)],
value=2,
description='Detection Channel:',
disabled=False,
style={'description_width': 'initial'}
)
self.b_spindle_mode = widgets.Dropdown(
options=['Fast', 'Peak', 'Through'],
value='Fast',
description='Spindle Stimulation Mode',
disabled=False,
style={'description_width': 'initial'}
)
self.b_spindle_freq = widgets.IntText(
value=self.spindle_freq,
description='Spindle Freq (Hz):',
disabled=False,
style={'description_width': 'initial'}
)
self.b_accordion_channels = widgets.Accordion(
children=[
widgets.GridBox([
widgets.Label('CH2'),
widgets.Label('CH3'),
widgets.Label('CH4'),
widgets.Label('CH5'),
widgets.Label('CH6'),
widgets.Label('CH7'),
widgets.Label('CH8'),
self.b_radio_ch2,
self.b_radio_ch3,
self.b_radio_ch4,
self.b_radio_ch5,
self.b_radio_ch6,
self.b_radio_ch7,
self.b_radio_ch8], layout=widgets.Layout(grid_template_columns="repeat(7, 90px)")
)
])
self.b_accordion_channels.set_title(index = 0, title = 'Channels')
# OTHERS ------------------------------
self.b_capture = widgets.ToggleButtons(
options=['Stop', 'Start'],
description='Capture:',
disabled=False,
button_style='', # 'success', 'info', 'warning', 'danger' or ''
tooltips=['Stop capture', 'Start capture'],
)
self.b_pause = widgets.ToggleButtons(
options=['Paused', 'Active'],
description='Detection',
disabled=True,
button_style='', # 'success', 'info', 'warning', 'danger' or ''
tooltips=['Detector and stimulator active', 'Detector and stimulator paused'],
)
self.b_clock = widgets.ToggleButtons(
options=['ADS', 'Coral'],
description='Clock:',
disabled=False,
button_style='', # 'success', 'info', 'warning', 'danger' or ''
tooltips=['Use Coral clock (very precise, not very timely)',
'Use ADS clock (not very precise, very timely)'],
)
self.b_power_line = widgets.ToggleButtons(
options=['60 Hz', '50 Hz'],
description='Power line:',
disabled=False,
button_style='', # 'success', 'info', 'warning', 'danger' or ''
tooltips=['North America 60 Hz',
'Europe 50 Hz'],
)
self.b_signal_input = widgets.ToggleButtons(
options=['ADS', 'File'],
description='Signal Input:',
disabled=False,
button_style='', # 'success', 'info', 'warning', 'danger' or ''
tooltips=['Read data from ADS.',
'Read data from file.'],
)
self.b_custom_fir = widgets.ToggleButtons(
options=['Default', 'Custom'],
description='FIR filter:',
disabled=False,
button_style='', # 'success', 'info', 'warning', 'danger' or ''
tooltips=['Use the default 30Hz low-pass FIR from the Portiloop paper',
'Use a custom FIR'],
)
self.b_filename = widgets.Text(
value='recording.edf',
description='Recording:',
disabled=False
)
self.b_frequency = widgets.IntText(
value=self.frequency,
description='Freq (Hz):',
disabled=False
)
self.b_threshold = widgets.FloatText(
value=self.threshold,
description='Threshold:',
disabled=True
)
self.b_polyak_mean = widgets.FloatText(
value=self.polyak_mean,
description='Polyak mean:',
disabled=False
)
self.b_polyak_std = widgets.FloatText(
value=self.polyak_std,
description='Polyak std:',
disabled=False
)
self.b_epsilon = widgets.FloatText(
value=self.epsilon,
description='Epsilon:',
disabled=False
)
self.b_custom_fir_order = widgets.IntText(
value=self.custom_fir_order,
description='FIR order:',
disabled=True
)
self.b_custom_fir_cutoff = widgets.IntText(
value=self.custom_fir_cutoff,
description='FIR cutoff:',
disabled=True
)
self.b_use_fir = widgets.Checkbox(
value=self.filter_args[0],
description='Use FIR',
disabled=False,
indent=False
)
self.b_use_notch = widgets.Checkbox(
value=self.filter_args[1],
description='Use notch',
disabled=False,
indent=False
)
self.b_use_std = widgets.Checkbox(
value=self.filter_args[2],
description='Use standardization',
disabled=False,
indent=False
)
self.b_accordion_filter = widgets.Accordion(
children=[
widgets.VBox([
self.b_custom_fir,
self.b_custom_fir_order,
self.b_custom_fir_cutoff,
self.b_polyak_mean,
self.b_polyak_std,
self.b_epsilon,
widgets.HBox([
self.b_use_fir,
self.b_use_notch,
self.b_use_std
])
])
])
self.b_accordion_filter.set_title(index = 0, title = 'Filtering')
self.b_duration = widgets.IntText(
value=self.duration,
description='Time (s):',
disabled=False
)
self.b_filter = widgets.Checkbox(
value=self.filter,
description='Filter',
disabled=False,
indent=False
)
self.b_detect = widgets.Checkbox(
value=self.detect,
description='Detect',
disabled=False,
indent=False
)
self.b_stimulate = widgets.Checkbox(
value=self.stimulate,
description='Stimulate',
disabled=True,
indent=False
)
self.b_record = widgets.Checkbox(
value=self.record,
description='Record EDF',
disabled=False,
indent=False
)
self.b_lsl = widgets.Checkbox(
value=self.lsl,
description='Stream LSL',
disabled=False,
indent=False
)
self.b_display = widgets.Checkbox(
value=self.display,
description='Display',
disabled=False,
indent=False
)
self.b_volume = widgets.IntSlider(
value=self.volume,
min=0,
max=100,
step=1,
description="Volume",
disabled=False
)
self.b_test_stimulus = widgets.Button(
description='Test stimulus',
disabled=True,
button_style='', # 'success', 'info', 'warning', 'danger' or ''
tooltip='Send a test stimulus'
)
self.b_test_impedance = widgets.Button(
description='Impedance Check',
disabled=False,
button_style='', # 'success', 'info', 'warning', 'danger' or ''
tooltip='Check if electrodes are properly connected'
)
# CALLBACKS ----------------------
self.b_capture.observe(self.on_b_capture, 'value')
self.b_clock.observe(self.on_b_clock, 'value')
self.b_frequency.observe(self.on_b_frequency, 'value')
self.b_threshold.observe(self.on_b_threshold, 'value')
self.b_duration.observe(self.on_b_duration, 'value')
self.b_filter.observe(self.on_b_filter, 'value')
self.b_use_fir.observe(self.on_b_use_fir, 'value')
self.b_use_notch.observe(self.on_b_use_notch, 'value')
self.b_use_std.observe(self.on_b_use_std, 'value')
self.b_detect.observe(self.on_b_detect, 'value')
self.b_stimulate.observe(self.on_b_stimulate, 'value')
self.b_record.observe(self.on_b_record, 'value')
self.b_lsl.observe(self.on_b_lsl, 'value')
self.b_display.observe(self.on_b_display, 'value')
self.b_filename.observe(self.on_b_filename, 'value')
self.b_radio_ch2.observe(self.on_b_radio_ch2, 'value')
self.b_radio_ch3.observe(self.on_b_radio_ch3, 'value')
self.b_radio_ch4.observe(self.on_b_radio_ch4, 'value')
self.b_radio_ch5.observe(self.on_b_radio_ch5, 'value')
self.b_radio_ch6.observe(self.on_b_radio_ch6, 'value')
self.b_radio_ch7.observe(self.on_b_radio_ch7, 'value')
self.b_radio_ch8.observe(self.on_b_radio_ch8, 'value')
self.b_channel_detect.observe(self.on_b_channel_detect, 'value')
self.b_spindle_mode.observe(self.on_b_spindle_mode, 'value')
self.b_spindle_freq.observe(self.on_b_spindle_freq, 'value')
self.b_power_line.observe(self.on_b_power_line, 'value')
self.b_signal_input.observe(self.on_b_power_line, 'value')
self.b_custom_fir.observe(self.on_b_custom_fir, 'value')
self.b_custom_fir_order.observe(self.on_b_custom_fir_order, 'value')
self.b_custom_fir_cutoff.observe(self.on_b_custom_fir_cutoff, 'value')
self.b_polyak_mean.observe(self.on_b_polyak_mean, 'value')
self.b_polyak_std.observe(self.on_b_polyak_std, 'value')
self.b_epsilon.observe(self.on_b_epsilon, 'value')
self.b_volume.observe(self.on_b_volume, 'value')
self.b_test_stimulus.on_click(self.on_b_test_stimulus)
self.b_test_impedance.on_click(self.on_b_test_impedance)
self.b_pause.observe(self.on_b_pause, 'value')
self.display_buttons()
def __del__(self):
self.b_capture.close()
def display_buttons(self):
display(widgets.VBox([self.b_accordion_channels,
self.b_channel_detect,
self.b_frequency,
self.b_duration,
self.b_filename,
self.b_signal_input,
self.b_power_line,
self.b_clock,
widgets.HBox([self.b_filter, self.b_detect, self.b_stimulate, self.b_record, self.b_lsl, self.b_display]),
widgets.HBox([self.b_threshold, self.b_test_stimulus]),
self.b_volume,
widgets.HBox([self.b_spindle_mode, self.b_spindle_freq]),
self.b_test_impedance,
self.b_accordion_filter,
self.b_capture,
self.b_pause]))
def enable_buttons(self):
self.b_frequency.disabled = False
self.b_duration.disabled = False
self.b_filename.disabled = False
self.b_filter.disabled = False
self.b_detect.disabled = False
self.b_record.disabled = False
self.b_lsl.disabled = False
self.b_display.disabled = False
self.b_clock.disabled = False
self.b_radio_ch2.disabled = False
self.b_radio_ch3.disabled = False
self.b_radio_ch4.disabled = False
self.b_radio_ch5.disabled = False
self.b_radio_ch6.disabled = False
self.b_radio_ch7.disabled = False
self.b_radio_ch8.disabled = False
self.b_power_line.disabled = False
self.b_signal_input.disabled = False
self.b_channel_detect.disabled = False
self.b_spindle_freq.disabled = False
self.b_spindle_mode.disabled = False
self.b_polyak_mean.disabled = False
self.b_polyak_std.disabled = False
self.b_epsilon.disabled = False
self.b_use_fir.disabled = False
self.b_use_notch.disabled = False
self.b_use_std.disabled = False
self.b_custom_fir.disabled = False
self.b_custom_fir_order.disabled = not self.custom_fir
self.b_custom_fir_cutoff.disabled = not self.custom_fir
self.b_stimulate.disabled = not self.detect
self.b_threshold.disabled = not self.detect
self.b_pause.disabled = not self.detect
self.b_test_stimulus.disabled = True # only enabled when running
self.b_test_impedance.disabled = False
def disable_buttons(self):
self.b_frequency.disabled = True
self.b_duration.disabled = True
self.b_filename.disabled = True
self.b_filter.disabled = True
self.b_stimulate.disabled = True
self.b_filter.disabled = True
self.b_detect.disabled = True
self.b_record.disabled = True
self.b_lsl.disabled = True
self.b_display.disabled = True
self.b_clock.disabled = True
self.b_radio_ch2.disabled = True
self.b_radio_ch3.disabled = True
self.b_radio_ch4.disabled = True
self.b_radio_ch5.disabled = True
self.b_radio_ch6.disabled = True
self.b_radio_ch7.disabled = True
self.b_radio_ch8.disabled = True
self.b_channel_detect.disabled = True
self.b_spindle_freq.disabled = True
self.b_spindle_mode.disabled = True
self.b_signal_input.disabled = True
self.b_power_line.disabled = True
self.b_polyak_mean.disabled = True
self.b_polyak_std.disabled = True
self.b_epsilon.disabled = True
self.b_use_fir.disabled = True
self.b_use_notch.disabled = True
self.b_use_std.disabled = True
self.b_custom_fir.disabled = True
self.b_custom_fir_order.disabled = True
self.b_custom_fir_cutoff.disabled = True
self.b_threshold.disabled = True
self.b_test_stimulus.disabled = not self.stimulate # only enabled when running
self.b_test_impedance.disabled = True
def on_b_radio_ch2(self, value):
self.channel_states[0] = value['new']
def on_b_radio_ch3(self, value):
self.channel_states[1] = value['new']
def on_b_radio_ch4(self, value):
self.channel_states[2] = value['new']
def on_b_radio_ch5(self, value):
self.channel_states[3] = value['new']
def on_b_radio_ch6(self, value):
self.channel_states[4] = value['new']
def on_b_radio_ch7(self, value):
self.channel_states[5] = value['new']
def on_b_radio_ch8(self, value):
self.channel_states[6] = value['new']
def on_b_channel_detect(self, value):
self.channel_detection = value['new']
def on_b_spindle_freq(self, value):
val = value['new']
if val > 0:
self.spindle_freq = val
else:
self.b_spindle_freq.value = self.spindle_freq
def on_b_spindle_mode(self, value):
self.spindle_detection_mode = value['new']
def on_b_capture(self, value):
val = value['new']
if val == 'Start':
clear_output()
self.disable_buttons()
if not self.python_clock: # ADS clock: force the frequency to an ADS-compatible frequency
self.frequency = to_ads_frequency(self.frequency)
self.b_frequency.value = self.frequency
self.display_buttons()
with self._lock_msg_out:
self._msg_out = None
if self._t_capture is not None:
warnings.warn("Capture already running, operation aborted.")
return
detector_cls = self.detector_cls if self.detect else None
stimulator_cls = self.stimulator_cls if self.stimulate else None
self._t_capture = Thread(target=self.start_capture,
args=(self.filter,
self.filter_args,
detector_cls,
self.threshold,
self.channel_detection,
stimulator_cls,
self.record,
self.lsl,
self.display,
2500,
self.python_clock))
self._t_capture.start()
elif val == 'Stop':
with self._lock_msg_out:
self._msg_out = 'STOP'
assert self._t_capture is not None
self._t_capture.join()
self._t_capture = None
self.enable_buttons()
def on_b_custom_fir(self, value):
val = value['new']
if val == 'Default':
self.custom_fir = False
elif val == 'Custom':
self.custom_fir = True
self.enable_buttons()
def on_b_clock(self, value):
val = value['new']
if val == 'Coral':
self.python_clock = True
elif val == 'ADS':
self.python_clock = False
def on_b_signal_input(self, value):
val = value['new']
if val == "ADS":
self.signal_input = "ADS"
elif val == "File":
self.signal_input = "File"
def on_b_power_line(self, value):
val = value['new']
if val == '60 Hz':
self.power_line = 60
elif val == '50 Hz':
self.power_line = 50
def on_b_frequency(self, value):
val = value['new']
if val > 0:
self.frequency = val
else:
self.b_frequency.value = self.frequency
def on_b_threshold(self, value):
val = value['new']
if val >= 0 and val <= 1:
self.threshold = val
else:
self.b_threshold.value = self.threshold
def on_b_filename(self, value):
val = value['new']
if val != '':
if not val.endswith('.edf'):
val += '.edf'
self.filename = EDF_PATH / val
else:
now = datetime.now()
self.filename = EDF_PATH / 'recording.edf'
def on_b_duration(self, value):
val = value['new']
if val > 0:
self.duration = val
def on_b_custom_fir_order(self, value):
val = value['new']
if val > 0:
self.custom_fir_order = val
else:
self.b_custom_fir_order.value = self.custom_fir_order
def on_b_custom_fir_cutoff(self, value):
val = value['new']
if val > 0 and val < self.frequency / 2:
self.custom_fir_cutoff = val
else:
self.b_custom_fir_cutoff.value = self.custom_fir_cutoff
def on_b_polyak_mean(self, value):
val = value['new']
if val >= 0 and val <= 1:
self.polyak_mean = val
else:
self.b_polyak_mean.value = self.polyak_mean
def on_b_polyak_std(self, value):
val = value['new']
if val >= 0 and val <= 1:
self.polyak_std = val
else:
self.b_polyak_std.value = self.polyak_std
def on_b_epsilon(self, value):
val = value['new']
if val > 0 and val < 0.1:
self.epsilon = val
else:
self.b_epsilon.value = self.epsilon
def on_b_filter(self, value):
val = value['new']
self.filter = val
def on_b_use_fir(self, value):
val = value['new']
self.filter_args[0] = val
def on_b_use_notch(self, value):
val = value['new']
self.filter_args[1] = val
def on_b_use_std(self, value):
val = value['new']
self.filter_args[2] = val
def on_b_stimulate(self, value):
val = value['new']
self.stimulate = val
def on_b_detect(self, value):
val = value['new']
self.detect = val
self.enable_buttons()
def on_b_record(self, value):
val = value['new']
self.record = val
def on_b_lsl(self, value):
val = value['new']
self.lsl = val
def on_b_display(self, value):
val = value['new']
self.display = val
def on_b_volume(self, value):
val = value['new']
if val >= 0 and val <= 100:
self.volume = val
self.mixer.setvolume(self.volume)
def on_b_test_stimulus(self, b):
with self._test_stimulus_lock:
self._test_stimulus = True
def on_b_test_impedance(self, b):
frontend = Frontend()
def is_set(x, n):
return x & 1 << n != 0
try:
frontend.write_regs(0x00, LEADOFF_CONFIG)
frontend.start()
start_time = time.time()
current_time = time.time()
while current_time - start_time < 2:
current_time = time.time()
reading = frontend.read()
# Check if any of the negative bits are set and initialize the impedance array
# impedance_check = [any([is_set(leadoff_n, i) for i in range(2, 9)])]
impedance_check = [any([reading.loff_n(i) for i in range(7)])]
for i in range(7):
impedance_check.append(reading.loff_p(i))
def print_impedance(impedance):
names = ["Ref", "Ch2", "Ch3", "Ch4", "Ch5", "Ch6", "Ch7", "Ch8"]
vals = [' Y ' if val else ' N ' for val in impedance]
print(' '.join(str(name) for name in names))
print(' '.join(str(val) for val in vals))
print_impedance(impedance_check)
finally:
frontend.close()
def on_b_pause(self, value):
val = value['new']
if val == 'Active':
with self._pause_detect_lock:
self._pause_detect = False
elif val == 'Paused':
with self._pause_detect_lock:
self._pause_detect = True
def start_capture(self,
filter,
filter_args,
detector_cls,
threshold,
channel,
stimulator_cls,
record,
lsl,
viz,
width,
python_clock):
if self.signal_input == "ADS":
if self.__capture_on:
warnings.warn("Capture is already ongoing, ignoring command.")
return
else:
self.__capture_on = True
p_msg_io, p_msg_io_2 = mp.Pipe()
p_data_i, p_data_o = mp.Pipe(duplex=False)
# Initialize filtering pipeline
if filter:
fp = FilterPipeline(nb_channels=8,
sampling_rate=self.frequency,
power_line_fq=self.power_line,
use_custom_fir=self.custom_fir,
custom_fir_order=self.custom_fir_order,
custom_fir_cutoff=self.custom_fir_cutoff,
alpha_avg=self.polyak_mean,
alpha_std=self.polyak_std,
epsilon=self.epsilon,
filter_args=filter_args)
# Initialize detector and stimulator
detector = detector_cls(threshold, channel=channel) if detector_cls is not None else None
stimulator = stimulator_cls() if stimulator_cls is not None else None
# Launch the capture process
if self.signal_input == "ADS":
self._p_capture = mp.Process(target=capture_process,
args=(p_data_o,
p_msg_io_2,
self.duration,
self.frequency,
python_clock,
1.0,
self.channel_states)
)
self._p_capture.start()
print(f"PID capture: {self._p_capture.pid}")
else:
filename = "INSERT FILENAME" # TODO
file_reader = FileReader(filename)
# Initialize display if requested
if viz:
live_disp = LiveDisplay(channel_names = self.signal_labels, window_len=width)
# Initialize recording if requested
if record:
recorder = EDFRecorder(self.signal_label)
# Initialize LSL to stream if requested
if lsl:
from pylsl import StreamInfo, StreamOutlet
lsl_info = StreamInfo(name='Portiloop Filtered',
type='Filtered EEG',
channel_count=8,
nominal_srate=self.frequency,
channel_format='float32',
source_id='portiloop1') # TODO: replace this by unique device identifier
lsl_outlet = StreamOutlet(lsl_info)
lsl_info_raw = StreamInfo(name='Portiloop Raw Data',
type='Raw EEG signal',
channel_count=8,
nominal_srate=self.frequency,
channel_format='float32',
source_id='portiloop1') # TODO: replace this by unique device identifier
lsl_outlet_raw = StreamOutlet(lsl_info_raw)
buffer = []
# Initialize stimulation delayer if requested
if not self.spindle_detection_mode == 'Fast' and stimulator is not None:
stimulation_delayer = UpStateDelayer(self.frequency, self.spindle_freq, self.spindle_detection_mode == 'Peak', time_to_buffer=0.1)
stimulator.add_delayer(stimulation_delayer)
else:
stimulation_delayer = None
# Main capture loop
while True:
if self.signal_input == "ADS":
# Send message in communication pipe if we have one
with self._lock_msg_out:
if self._msg_out is not None:
p_msg_io.send(self._msg_out)
self._msg_out = None
# Check if we have received a message in communication pipe
if p_msg_io.poll():
mess = p_msg_io.recv()
if mess == 'STOP':
break
elif mess[0] == 'PRT':
print(mess[1])
# Retrieve all data points from data pipe p_data
point = None
if p_data_i.poll(timeout=(1 / self.frequency)):
point = p_data_i.recv()
else:
continue
# Convert point from int to corresponding value in microvolts
n_array_raw = int_to_float(np.array([point]))
elif self.signal_input == "File":
n_array_raw, gt_stimulation = file_reader.get_point()
# Go through filtering pipeline
if filter:
n_array = fp.filter(deepcopy(n_array_raw))
else:
n_array = deepcopy(n_array_raw)
# Contains the filtered point (if filtering is off, contains a copy of the raw point)
filtered_point = n_array.tolist()
# Send both raw and filtered points over LSL
if lsl:
raw_point = n_array_raw.tolist()
lsl_outlet_raw.push_sample(raw_point[-1])
lsl_outlet.push_sample(filtered_point[-1])
# Adds point to buffer for delayed stimulation
if stimulation_delayer is not None:
stimulation_delayer.step(filtered_point[0][channel-1])
# Check if detection is on or off
with self._pause_detect_lock:
pause = self._pause_detect
# If detection is on
if detector is not None and not pause:
# Detect using the latest point
detection_signal = detector.detect(filtered_point)
# Stimulate
if stimulator is not None:
stimulator.stimulate(detection_signal)
with self._test_stimulus_lock:
test_stimulus = self._test_stimulus
self._test_stimulus = False
if test_stimulus:
stimulator.test_stimulus()
if self.signal_input == "File" and gt_stimulation:
stimulator.send_stimulation("GROUND_TRUTH_STIM", False)
# Add point to the buffer to send to viz and recorder
buffer += filtered_point
if len(buffer) >= 50:
if viz:
live_disp.add_datapoints(buffer)
if record:
recorder.add_recording_data(buffer)
buffer = []
if self.signal_input == "ADS":
# Empty pipes
while True:
if p_data_i.poll():
_ = p_data_i.recv()
elif p_msg_io.poll():
_ = p_msg_io.recv()
else:
break
p_data_i.close()
p_msg_io.close()
self._p_capture.join()
self.__capture_on = False
if record:
recorder.close_recording_file()
if __name__ == "__main__":
pass