|
import time
|
|
|
|
from threading import Thread, Lock
|
|
|
|
|
|
class Listener:
|
|
task_queue = []
|
|
lock = Lock()
|
|
thread = None
|
|
|
|
@classmethod
|
|
def _process_tasks(cls):
|
|
while True:
|
|
task = None
|
|
with cls.lock:
|
|
if cls.task_queue:
|
|
task = cls.task_queue.pop(0)
|
|
|
|
if task is None:
|
|
time.sleep(0.001)
|
|
continue
|
|
|
|
func, args, kwargs = task
|
|
try:
|
|
func(*args, **kwargs)
|
|
except Exception as e:
|
|
print(f"Error in listener thread: {e}")
|
|
|
|
@classmethod
|
|
def add_task(cls, func, *args, **kwargs):
|
|
with cls.lock:
|
|
cls.task_queue.append((func, args, kwargs))
|
|
|
|
if cls.thread is None:
|
|
cls.thread = Thread(target=cls._process_tasks, daemon=True)
|
|
cls.thread.start()
|
|
|
|
|
|
def async_run(func, *args, **kwargs):
|
|
Listener.add_task(func, *args, **kwargs)
|
|
|
|
|
|
class FIFOQueue:
|
|
def __init__(self):
|
|
self.queue = []
|
|
self.lock = Lock()
|
|
print("【调试】创建新的FIFOQueue")
|
|
|
|
def push(self, item):
|
|
print(f"【调试】FIFOQueue.push: 准备添加项目: {item}")
|
|
with self.lock:
|
|
self.queue.append(item)
|
|
print(f"【调试】FIFOQueue.push: 成功添加项目: {item}, 当前队列长度: {len(self.queue)}")
|
|
|
|
def pop(self):
|
|
print("【调试】FIFOQueue.pop: 准备弹出队列首项")
|
|
with self.lock:
|
|
if self.queue:
|
|
item = self.queue.pop(0)
|
|
print(f"【调试】FIFOQueue.pop: 成功弹出项目: {item}, 剩余队列长度: {len(self.queue)}")
|
|
return item
|
|
print("【调试】FIFOQueue.pop: 队列为空,返回None")
|
|
return None
|
|
|
|
def top(self):
|
|
print("【调试】FIFOQueue.top: 准备查看队列首项")
|
|
with self.lock:
|
|
if self.queue:
|
|
item = self.queue[0]
|
|
print(f"【调试】FIFOQueue.top: 队列首项为: {item}, 当前队列长度: {len(self.queue)}")
|
|
return item
|
|
print("【调试】FIFOQueue.top: 队列为空,返回None")
|
|
return None
|
|
|
|
def next(self):
|
|
print("【调试】FIFOQueue.next: 等待弹出队列首项")
|
|
while True:
|
|
with self.lock:
|
|
if self.queue:
|
|
item = self.queue.pop(0)
|
|
print(f"【调试】FIFOQueue.next: 成功弹出项目: {item}, 剩余队列长度: {len(self.queue)}")
|
|
return item
|
|
|
|
time.sleep(0.001)
|
|
|
|
|
|
class AsyncStream:
|
|
def __init__(self):
|
|
self.input_queue = FIFOQueue()
|
|
self.output_queue = FIFOQueue()
|
|
|
|
|
|
class InterruptibleStreamData:
|
|
def __init__(self):
|
|
self.input_queue = FIFOQueue()
|
|
self.output_queue = FIFOQueue()
|
|
print("【调试】创建新的InterruptibleStreamData,初始化输入输出队列")
|
|
|
|
|
|
def push_output(self, item):
|
|
print(f"【调试】InterruptibleStreamData.push_output: 准备推送输出: {type(item)}")
|
|
self.output_queue.push(item)
|
|
print(f"【调试】InterruptibleStreamData.push_output: 成功推送输出")
|
|
|
|
|
|
def get_output(self):
|
|
print("【调试】InterruptibleStreamData.get_output: 准备获取下一个输出数据")
|
|
item = self.output_queue.next()
|
|
print(f"【调试】InterruptibleStreamData.get_output: 获取到输出数据: {type(item)}")
|
|
return item
|
|
|
|
|
|
def push_input(self, item):
|
|
print(f"【调试】InterruptibleStreamData.push_input: 准备推送输入: {type(item)}")
|
|
self.input_queue.push(item)
|
|
print(f"【调试】InterruptibleStreamData.push_input: 成功推送输入")
|
|
|
|
|
|
def get_input(self):
|
|
print("【调试】InterruptibleStreamData.get_input: 准备获取下一个输入数据")
|
|
item = self.input_queue.next()
|
|
print(f"【调试】InterruptibleStreamData.get_input: 获取到输入数据: {type(item)}")
|
|
return item
|
|
|