Spaces:
Sleeping
Sleeping
File size: 5,292 Bytes
35cdf83 ba200d7 45a88e4 35cdf83 45a88e4 35cdf83 ba200d7 35cdf83 ba200d7 35cdf83 6da664d 35cdf83 45a88e4 c20b8b2 45a88e4 c20b8b2 6da664d 45a88e4 11677c4 45a88e4 35cdf83 6da664d c20b8b2 35cdf83 6da664d 35cdf83 ba200d7 6da664d ba200d7 35cdf83 ba200d7 45a88e4 ba200d7 45a88e4 6da664d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 |
from abc import ABC, abstractmethod
import time
from threading import Thread, Lock
from pathlib import Path
import alsaaudio
import wave
import pylsl
# Abstract interface for developers:
class Stimulator(ABC):
@abstractmethod
def stimulate(self, detection_signal):
"""
Stimulates accordingly to the output of the Detector.
Args:
detection_signal: Object: the output of the Detector.add_datapoints method.
"""
raise NotImplementedError
def test_stimulus(self):
"""
Optional: this is called when the 'Test stimulus' button is pressed.
"""
pass
# Example implementation for sleep spindles
class SleepSpindleRealTimeStimulator(Stimulator):
def __init__(self):
self._sound = Path(__file__).parent / 'sounds' / 'stimulus.wav'
print(f"DEBUG:{self._sound}")
self._thread = None
self._lock = Lock()
self.last_detected_ts = time.time()
self.wait_counter = 0
self.delayed = False
self.wait_t = 0.4 # 400 ms
lsl_markers_info = pylsl.StreamInfo(name='Portiloop_stimuli',
type='Markers',
channel_count=1,
channel_format='string',
source_id='portiloop1') # TODO: replace this by unique device identifier
lsl_markers_info_fast = pylsl.StreamInfo(name='Portiloop_stimuli_fast',
type='Markers',
channel_count=1,
channel_format='string',
source_id='portiloop1') # TODO: replace this by unique device identifier
self.lsl_outlet_markers = pylsl.StreamOutlet(lsl_markers_info)
self.lsl_outlet_markers_fast = pylsl.StreamOutlet(lsl_markers_info_fast)
self.delayer = None
# Initialize Alsa stuff
# Open WAV file and set PCM device
with wave.open(str(self._sound), 'rb') as f:
device = 'default'
format = None
# 8bit is unsigned in wav files
if f.getsampwidth() == 1:
format = alsaaudio.PCM_FORMAT_U8
# Otherwise we assume signed data, little endian
elif f.getsampwidth() == 2:
format = alsaaudio.PCM_FORMAT_S16_LE
elif f.getsampwidth() == 3:
format = alsaaudio.PCM_FORMAT_S24_3LE
elif f.getsampwidth() == 4:
format = alsaaudio.PCM_FORMAT_S32_LE
else:
raise ValueError('Unsupported format')
self.periodsize = f.getframerate() // 8
self.pcm = alsaaudio.PCM(channels=f.getnchannels(), rate=f.getframerate(), format=format, periodsize=self.periodsize, device=device)
# Store data in list to avoid reopening the file
data = f.readframes(self.periodsize)
self.wav_list = [data]
while data:
self.wav_list.append(data)
data = f.readframes(self.periodsize)
def play_sound(self):
'''
Open the wav file and play a sound
'''
for data in self.wav_list:
self.pcm.write(data)
def stimulate(self, detection_signal):
for sig in detection_signal:
# We are waiting for a delayed stimulation
if self.delayed:
if self.wait_counter >= self.wait_time:
with self._lock:
if self._thread is None:
self._thread = Thread(target=self._t_sound, daemon=True)
self._thread.start()
self.delayed = False
else:
self.wait_counter += 1
# We detect a stimulation
elif sig:
# Record time of stimulation
self.lsl_outlet_markers_fast.push_sample(['FASTSTIM'])
ts = time.time()
# Prompt delayer to try and get a stimulation
if self.delayer is not None:
self.wait_time = self.delayer.stimulate()
self.delayed = True
self.wait_counter = 0
continue
# Stimulate if allowed
if ts - self.last_detected_ts > self.wait_t:
with self._lock:
if self._thread is None:
self._thread = Thread(target=self._t_sound, daemon=True)
self._thread.start()
self.last_detected_ts = ts
def _t_sound(self):
self.lsl_outlet_markers.push_sample(['STIM'])
self.play_sound()
with self._lock:
self._thread = None
def test_stimulus(self):
with self._lock:
if self._thread is None:
self._thread = Thread(target=self._t_sound, daemon=True)
self._thread.start()
def add_delayer(self, delayer):
self.delayer = delayer
|