VOOZH about

URL: https://dev.to/martschweiger/build-an-agora-transcription-bot-with-assemblyai-universal-3-pro-51jb

⇱ Build an Agora Transcription Bot with AssemblyAI Universal-3 Pro - DEV Community


Build an Agora Transcription Bot with AssemblyAI Universal-3 Pro

This tutorial walks through building a real-time transcription bot in Python that joins an Agora channel as a silent observer, captures each participant's audio as raw PCM frames, and streams it to AssemblyAI Universal-3 Pro Streaming for speaker-aware transcripts.

The full source is available at github.com/kelseyefoster/voice-agent-agora-universal-3-pro.

Why This Stack?

Agora's Python Server SDK lets a server-side bot join channels, subscribe to participant audio as raw PCM frames, and optionally publish audio back — without any browser or mobile client. This PCM stream format aligns directly with what AssemblyAI Universal-3 Pro Streaming expects, making the integration unusually clean.

Metric AssemblyAI Universal-3 Pro Agora Built-in STT
P50 latency 307ms ~600–900ms
Word Error Rate 8.9% ~14–18%
Speaker diarization ✅ Real-time
Languages 99+ Limited

Prerequisites

Quick Start

git clone https://github.com/kelseyefoster/voice-agent-agora-universal-3-pro
cd voice-agent-agora-universal-3-pro

pip install -r requirements.txt
cp .env.example .env
# Fill in AGORA_APP_ID, AGORA_APP_CERT, ASSEMBLYAI_API_KEY

python bot.py --channel my-channel

The bot joins the channel, opens one AssemblyAI WebSocket per participant, and prints completed turn transcripts to stdout. Press Ctrl+C to stop cleanly.

Environment Variables

AGORA_APP_ID=your_agora_app_id
AGORA_APP_CERT=your_agora_certificate
AGORA_CHANNEL=my-channel
AGORA_BOT_UID=9999
ASSEMBLYAI_API_KEY=your_assemblyai_api_key

How It Works

1. Join the channel as an audience bot

from agora.rtc.agora_service import AgoraService, AgoraServiceConfig
from agora.rtc.rtc_connection import RTCConnConfig
from agora.rtc.agora_base import ClientRoleType, ChannelProfileType, AudioScenarioType

cfg = AgoraServiceConfig()
cfg.appid = AGORA_APP_ID
cfg.enable_audio_processor = True
cfg.audio_scenario = AudioScenarioType.AUDIO_SCENARIO_CHORUS

service = AgoraService()
service.initialize(cfg)

conn_cfg = RTCConnConfig(
 client_role_type=ClientRoleType.CLIENT_ROLE_AUDIENCE,
 channel_profile=ChannelProfileType.CHANNEL_PROFILE_LIVE_BROADCASTING,
)
connection = service.create_rtc_connection(conn_cfg)
connection.connect(token, channel, str(bot_uid))

2. Configure 16 kHz audio output before subscribing

agora_channel = connection.get_local_user()

# Set BEFORE subscribe_all_audio — eliminates resampling
agora_channel.set_playback_audio_frame_before_mixing_parameters(
 num_of_channels=1,
 sample_rate=16000,
)
agora_channel.subscribe_all_audio()

Each PcmAudioFrame will contain 160 samples of 16-bit little-endian PCM at 16 kHz mono — exactly what AssemblyAI expects.

3. Open one AssemblyAI WebSocket per participant

AAI_WS_URL = (
 "wss://streaming.assemblyai.com/v3/ws"
 f"?sample_rate=16000"
 "&speech_model=u3-rt-pro"
 "&format_turns=true"
)

async def stream_participant(agora_channel, uid: int, api_key: str):
 headers = {"Authorization": api_key}
 async with websockets.connect(AAI_WS_URL, additional_headers=headers) as ws:
 begin = json.loads(await ws.recv())
 print(f"[uid={uid}] Session: {begin['id']}")

 async def send_audio():
 async for frame in agora_channel.get_audio_frames(uid):
 await ws.send(frame.data)

 async def recv_transcripts():
 async for message in ws:
 event = json.loads(message)
 if event["type"] == "Turn" and event.get("end_of_turn"):
 print(f"[uid={uid}] {event['transcript']}")

 await asyncio.gather(send_audio(), recv_transcripts())

4. Track participants dynamically

active_streams: dict[int, asyncio.Task] = {}

def on_user_joined(uid: int):
 task = asyncio.create_task(stream_participant(agora_channel, uid, api_key))
 active_streams[uid] = task

def on_user_left(uid: int, reason: int):
 if uid in active_streams:
 active_streams[uid].cancel()
 del active_streams[uid]

connection.register_observer_callback("on_user_joined", on_user_joined)
connection.register_observer_callback("on_user_offline", on_user_left)

5. Terminate cleanly

async def close_stream(ws):
 await ws.send(json.dumps({"type": "Terminate"}))
 async for message in ws:
 event = json.loads(message)
 if event["type"] == "Termination":
 print(f"Audio processed: {event['audio_duration_seconds']}s")
 break

Production Token Generation

pip install agora-token-builder
from agora_token_builder import RtcTokenBuilder, Role_Subscriber
import time

def generate_bot_token(app_id, app_cert, channel, uid):
 expire = int(time.time()) + 3600
 return RtcTokenBuilder.buildTokenWithUid(
 app_id, app_cert, channel, uid, Role_Subscriber, expire
 )

Extending the Bot

The end_of_turn transcript is a clean signal to drive downstream logic:

if event["type"] == "Turn" and event.get("end_of_turn"):
 transcript = event["transcript"]

 # Option A: send to an LLM
 await send_to_llm(uid, transcript)

 # Option B: store in a database
 await db.insert(uid=uid, text=transcript)

 # Option C: trigger a webhook
 await post_webhook({"uid": uid, "text": transcript})

Resources