File size: 5,292 Bytes
35cdf83
 
ba200d7
 
45a88e4
 
 
35cdf83
 
 
 
 
 
 
 
 
 
 
 
 
 
 
45a88e4
 
 
 
 
 
35cdf83
 
ba200d7
35cdf83
 
 
ba200d7
 
 
 
35cdf83
6da664d
 
35cdf83
45a88e4
 
 
 
 
 
c20b8b2
 
 
 
 
 
 
45a88e4
c20b8b2
 
6da664d
45a88e4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
11677c4
45a88e4
 
 
 
 
 
 
35cdf83
 
 
6da664d
 
 
 
 
 
 
 
 
 
 
 
 
c20b8b2
35cdf83
6da664d
 
 
 
 
 
 
 
 
35cdf83
ba200d7
6da664d
ba200d7
 
35cdf83
ba200d7
 
45a88e4
 
ba200d7
 
45a88e4
 
 
 
 
 
6da664d
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
from abc import ABC, abstractmethod
import time
from threading import Thread, Lock
from pathlib import Path
import alsaaudio
import wave
import pylsl


# Abstract interface for developers:

class Stimulator(ABC):

    @abstractmethod
    def stimulate(self, detection_signal):
        """
        Stimulates accordingly to the output of the Detector.
        
        Args:
            detection_signal: Object: the output of the Detector.add_datapoints method.
        """
        raise NotImplementedError
    
    def test_stimulus(self):
        """
        Optional: this is called when the 'Test stimulus' button is pressed.
        """
        pass

        
# Example implementation for sleep spindles

class SleepSpindleRealTimeStimulator(Stimulator):
    def __init__(self):
        self._sound = Path(__file__).parent / 'sounds' / 'stimulus.wav'
        print(f"DEBUG:{self._sound}")
        self._thread = None
        self._lock = Lock()
        self.last_detected_ts = time.time()
        self.wait_counter = 0
        self.delayed = False
        self.wait_t = 0.4  # 400 ms
        
        lsl_markers_info = pylsl.StreamInfo(name='Portiloop_stimuli',
                                  type='Markers',
                                  channel_count=1,
                                  channel_format='string',
                                  source_id='portiloop1')  # TODO: replace this by unique device identifier
        
        lsl_markers_info_fast = pylsl.StreamInfo(name='Portiloop_stimuli_fast',
                                  type='Markers',
                                  channel_count=1,
                                  channel_format='string',
                                  source_id='portiloop1')  # TODO: replace this by unique device identifier
        
        self.lsl_outlet_markers = pylsl.StreamOutlet(lsl_markers_info)
        self.lsl_outlet_markers_fast = pylsl.StreamOutlet(lsl_markers_info_fast)

        self.delayer = None
        
        # Initialize Alsa stuff
        # Open WAV file and set PCM device
        with wave.open(str(self._sound), 'rb') as f: 
            device = 'default'

            format = None

            # 8bit is unsigned in wav files
            if f.getsampwidth() == 1:
                format = alsaaudio.PCM_FORMAT_U8
            # Otherwise we assume signed data, little endian
            elif f.getsampwidth() == 2:
                format = alsaaudio.PCM_FORMAT_S16_LE
            elif f.getsampwidth() == 3:
                format = alsaaudio.PCM_FORMAT_S24_3LE
            elif f.getsampwidth() == 4:
                format = alsaaudio.PCM_FORMAT_S32_LE
            else:
                raise ValueError('Unsupported format')

            self.periodsize = f.getframerate() // 8

            self.pcm = alsaaudio.PCM(channels=f.getnchannels(), rate=f.getframerate(), format=format, periodsize=self.periodsize, device=device)
            
            # Store data in list to avoid reopening the file
            data = f.readframes(self.periodsize)
            self.wav_list = [data]
            while data:
                self.wav_list.append(data)
                data = f.readframes(self.periodsize)            
       
    def play_sound(self):
        '''
        Open the wav file and play a sound
        '''
        for data in self.wav_list:
            self.pcm.write(data) 
    
    def stimulate(self, detection_signal):
        for sig in detection_signal:
            # We are waiting for a delayed stimulation
            if self.delayed:
                if self.wait_counter >= self.wait_time:
                    with self._lock:
                        if self._thread is None: 
                            self._thread = Thread(target=self._t_sound, daemon=True)
                            self._thread.start()
                    self.delayed = False
                else:
                    self.wait_counter += 1
            # We detect a stimulation
            elif sig:
                # Record time of stimulation
                self.lsl_outlet_markers_fast.push_sample(['FASTSTIM'])
                ts = time.time()
                
                # Prompt delayer to try and get a stimulation
                if self.delayer is not None:
                    self.wait_time = self.delayer.stimulate()
                    self.delayed = True
                    self.wait_counter = 0
                    continue
                
                # Stimulate if allowed
                if ts - self.last_detected_ts > self.wait_t:
                    with self._lock:
                        if self._thread is None: 
                            self._thread = Thread(target=self._t_sound, daemon=True)
                            self._thread.start()
                self.last_detected_ts = ts
                
    def _t_sound(self):
        self.lsl_outlet_markers.push_sample(['STIM'])
        self.play_sound()
        with self._lock:
            self._thread = None
    
    def test_stimulus(self):
        with self._lock:
            if self._thread is None:
                self._thread = Thread(target=self._t_sound, daemon=True)
                self._thread.start()
                
    def add_delayer(self, delayer):
        self.delayer = delayer