Spaces:
Running
Running
import numpy as np | |
import asyncio | |
import websockets | |
import json | |
async def test_ws(): | |
uri = "wss://androidguy-speaker-diarization.hf.space/ws_inference" | |
async with websockets.connect(uri) as websocket: | |
print("Connected") | |
print(await websocket.recv()) # connection_established | |
for i in range(20): | |
# Generate random "noise" audio instead of silence | |
audio = (np.random.randn(3200) * 3000).astype(np.int16) | |
await websocket.send(audio.tobytes()) | |
print(f"Sent audio chunk {i+1}/20") | |
await asyncio.sleep(0.050) | |
try: | |
while True: | |
res = await asyncio.wait_for(websocket.recv(), timeout=10) | |
print("Received:", json.dumps(json.loads(res), indent=2)) | |
except asyncio.TimeoutError: | |
print("[ERROR] No more responses (timeout)") | |
asyncio.run(test_ws()) |