VOOZH about

URL: https://dev.to/kalyna_pro/streaming-responses-with-claude-api-in-python-2026-44la

⇱ Streaming Responses with Claude API in Python (2026) - DEV Community


Originally published at kalyna.pro

Streaming sends Claude's response token by token as it's generated, instead of waiting for the full completion before showing anything. For a chat UI this is the difference between a user staring at a spinner for several seconds and seeing the first words appear within a few hundred milliseconds. The Claude API Tutorial introduces the basic stream.text_stream helper — this guide covers the full picture: the raw event stream, async streaming, error handling, and a complete FastAPI endpoint that streams Claude's output to a browser.

Prerequisites

pip install anthropic
# for the API endpoint example later:
pip install fastapi uvicorn

The Simple Way: text_stream

from anthropic import Anthropic

client = Anthropic()

with client.messages.stream(
 model="claude-sonnet-4-6",
 max_tokens=1024,
 messages=[{"role": "user", "content": "Write a haiku about debugging."}],
) as stream:
 for text in stream.text_stream:
 print(text, end="", flush=True)

 final_message = stream.get_final_message()

print(f"\n\nstop_reason: {final_message.stop_reason}")
print(f"output tokens: {final_message.usage.output_tokens}")

stream.get_final_message() returns the same Message object you'd get from a non-streaming call — complete content, stop_reason, and usage — without manually reassembling it from chunks.

The Raw Event Stream

with client.messages.stream(
 model="claude-sonnet-4-6",
 max_tokens=1024,
 messages=[{"role": "user", "content": "Write a haiku about debugging."}],
) as stream:
 for event in stream:
 print(event.type)

Event types, in order:

  • message_start — initial Message shell with usage.input_tokens
  • content_block_start — a new content block begins (text, tool_use, etc.)
  • content_block_delta — incremental content: text_delta (.text), input_json_delta (.partial_json, for tool inputs), or thinking_delta
  • content_block_stop — the block is complete
  • message_deltastop_reason and updated usage.output_tokens
  • message_stop — stream finished
with client.messages.stream(
 model="claude-sonnet-4-6",
 max_tokens=1024,
 messages=[{"role": "user", "content": "Write a haiku about debugging."}],
) as stream:
 for event in stream:
 if event.type == "content_block_delta" and event.delta.type == "text_delta":
 print(event.delta.text, end="", flush=True)
 elif event.type == "message_delta":
 print(f"\n[tokens so far: {event.usage.output_tokens}]", end="")

Async Streaming

import asyncio
from anthropic import AsyncAnthropic

client = AsyncAnthropic()


async def main():
 async with client.messages.stream(
 model="claude-sonnet-4-6",
 max_tokens=1024,
 messages=[{"role": "user", "content": "Write a haiku about debugging."}],
 ) as stream:
 async for text in stream.text_stream:
 print(text, end="", flush=True)


asyncio.run(main())

Building a Streaming API Endpoint (FastAPI + SSE)

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from anthropic import AsyncAnthropic

app = FastAPI()
client = AsyncAnthropic()


@app.get("/chat")
async def chat(message: str):
 async def event_stream():
 async with client.messages.stream(
 model="claude-sonnet-4-6",
 max_tokens=1024,
 messages=[{"role": "user", "content": message}],
 ) as stream:
 async for text in stream.text_stream:
 yield f"data: {text}\n\n"

 yield "event: done\ndata: {}\n\n"

 return StreamingResponse(
 event_stream(),
 media_type="text/event-stream",
 headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
 )

X-Accel-Buffering: no stops nginx from buffering the whole response — without it, "streaming" arrives in one burst at the end. On the frontend, read with fetch + a ReadableStream reader, or EventSource for GET endpoints.

Handling Errors and Interruptions

import anthropic

try:
 with client.messages.stream(
 model="claude-sonnet-4-6",
 max_tokens=1024,
 messages=[{"role": "user", "content": "Write a haiku about debugging."}],
 ) as stream:
 for text in stream.text_stream:
 print(text, end="", flush=True)
except anthropic.APIConnectionError:
 print("\n[connection lost — showing partial response]")
except anthropic.RateLimitError:
 print("\n[rate limited — retry shortly]")
except anthropic.APIStatusError as e:
 print(f"\n[API error {e.status_code}]")

If the client disconnects mid-response, exit the generator early so the SDK closes the stream — this stops billing for output tokens generated into the void. For long generations, check await request.is_disconnected() periodically and break if true.

Streaming with Tool Use

Text still arrives via text_delta, tool arguments arrive incrementally via input_json_delta, and stream.get_final_message() gives fully-parsed tool_use blocks once the stream ends. See Claude API Function Calling for the complete tool-use loop — it works unchanged whether calls are streamed or not.

Best Practices

  • Use get_final_message() for stop_reason/usage instead of accumulating message_delta manually
  • Use AsyncAnthropic in web backends — a sync stream blocks the event loop
  • Set Cache-Control: no-cache and X-Accel-Buffering: no for SSE behind a proxy
  • Detect client disconnects and stop generation early
  • Streaming doesn't change pricing — tokens are billed the same either way
  • Handle APIConnectionError, RateLimitError, and APIStatusError explicitly

Summary

  • stream.text_stream yields plain text chunks for display
  • Raw events: message_start, content_block_start, content_block_delta, content_block_stop, message_delta, message_stop
  • get_final_message() returns the complete Message after streaming
  • AsyncAnthropic + async with/async for for non-blocking backends
  • FastAPI StreamingResponse + async generator → SSE to the browser
  • Tool use streams the same way; input_json_delta carries tool arguments

Further reading: