Spaces:
Sleeping
Sleeping
File size: 3,697 Bytes
35cdf83 ba200d7 45a88e4 35cdf83 45a88e4 35cdf83 ba200d7 35cdf83 ba200d7 35cdf83 45a88e4 35cdf83 ba200d7 35cdf83 ba200d7 45a88e4 ba200d7 45a88e4 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 |
from abc import ABC, abstractmethod
import time
from threading import Thread, Lock
from pathlib import Path
import alsaaudio
import wave
import pylsl
# Abstract interface for developers:
class Stimulator(ABC):
@abstractmethod
def stimulate(self, detection_signal):
"""
Stimulates accordingly to the output of the Detector.
Args:
detection_signal: Object: the output of the Detector.add_datapoints method.
"""
raise NotImplementedError
def test_stimulus(self):
"""
Optional: this is called when the 'Test stimulus' button is pressed.
"""
pass
# Example implementation for sleep spindles
class SleepSpindleRealTimeStimulator(Stimulator):
def __init__(self):
self._sound = Path(__file__).parent / 'sounds' / 'stimulus.wav'
print(f"DEBUG:{self._sound}")
self._thread = None
self._lock = Lock()
self.last_detected_ts = time.time()
self.wait_t = 0.4 # 400 ms
lsl_markers_info = pylsl.StreamInfo(name='Portiloop_stimuli',
type='Markers',
channel_count=1,
channel_format='string',
source_id='portiloop1') # TODO: replace this by unique device identifier
self.lsl_outlet_markers = pylsl.StreamOutlet(lsl_markers_info)
# Initialize Alsa stuff
# Open WAV file and set PCM device
with wave.open(str(self._sound), 'rb') as f:
device = 'default'
format = None
# 8bit is unsigned in wav files
if f.getsampwidth() == 1:
format = alsaaudio.PCM_FORMAT_U8
# Otherwise we assume signed data, little endian
elif f.getsampwidth() == 2:
format = alsaaudio.PCM_FORMAT_S16_LE
elif f.getsampwidth() == 3:
format = alsaaudio.PCM_FORMAT_S24_3LE
elif f.getsampwidth() == 4:
format = alsaaudio.PCM_FORMAT_S32_LE
else:
raise ValueError('Unsupported format')
self.periodsize = f.getframerate() // 8
self.pcm = alsaaudio.PCM(channels=f.getnchannels(), rate=f.getframerate(), format=format, periodsize=self.periodsize, device=device)
# Store data in list to avoid reopening the file
data = f.readframes(self.periodsize)
self.wav_list = [data]
while data:
self.wav_list.append(data)
data = f.readframes(self.periodsize)
def play_sound(self):
'''
Open the wav file and play a sound
'''
for data in self.wav_list:
self.pcm.write(data)
def stimulate(self, detection_signal):
for sig in detection_signal:
if sig:
ts = time.time()
if ts - self.last_detected_ts > self.wait_t:
with self._lock:
if self._thread is None:
self._thread = Thread(target=self._t_sound, daemon=True)
self._thread.start()
self.last_detected_ts = ts
def _t_sound(self):
self.lsl_outlet_markers.push_sample(['STIM'])
self.play_sound()
with self._lock:
self._thread = None
def test_stimulus(self):
with self._lock:
if self._thread is None:
self._thread = Thread(target=self._t_sound, daemon=True)
self._thread.start()
|