Speaker-Diarization / test_websocket.py
Saiyaswanth007's picture
Verbose
f722385
raw
history blame contribute delete
903 Bytes
import numpy as np
import asyncio
import websockets
import json
async def test_ws():
uri = "wss://androidguy-speaker-diarization.hf.space/ws_inference"
async with websockets.connect(uri) as websocket:
print("Connected")
print(await websocket.recv()) # connection_established
for i in range(20):
# Generate random "noise" audio instead of silence
audio = (np.random.randn(3200) * 3000).astype(np.int16)
await websocket.send(audio.tobytes())
print(f"Sent audio chunk {i+1}/20")
await asyncio.sleep(0.050)
try:
while True:
res = await asyncio.wait_for(websocket.recv(), timeout=10)
print("Received:", json.dumps(json.loads(res), indent=2))
except asyncio.TimeoutError:
print("[ERROR] No more responses (timeout)")
asyncio.run(test_ws())