VOOZH about

URL: https://pypi.org/project/faststream/

⇱ faststream Β· PyPI


Skip to main content

faststream 0.7.1

pip install faststream

Latest release

Released:

FastStream: the simplest way to work with a messaging queues

Navigation

Verified details

These details have been verified by PyPI
Owner
Maintainers
πŸ‘ Avatar for lancetnik from gravatar.com
lancetnik

Unverified details

These details have not been verified by PyPI
Project links
Meta
  • License Expression: Apache-2.0
    SPDX License Expression
  • Author: AG2AI, Nikita Pastukhov
  • Tags rabbitmq , kafka , nats , redis , mqtt , asyncapi , framework , message brokers
  • Requires: Python >=3.10
  • Provides-Extra: cli , confluent , kafka , mqtt , nats , otel , prometheus , rabbit , redis

Project description

FastStream

FastStream is an asynchronous Python framework for building event-driven applications. It brings together message broker integration, dependency injection, validation, testing utilities, and AsyncAPI documentation generation in a single toolkit, reducing boilerplate without hiding the capabilities of the underlying broker.



Features

FastStream simplifies the process of writing producers and consumers for message queues, handling all the parsing, networking and documentation generation automatically.

Making streaming microservices has never been easier. Designed with junior developers in mind, FastStream simplifies your work while keeping the door open for more advanced use cases. Here's a look at the core features that make FastStream a go-to framework for modern, data-centric microservices.

  • Multiple Brokers: FastStream provides a suitable API to work across multiple message brokers (Kafka, RabbitMQ, NATS, Redis, MQTT support)

  • Built-in Serialization: Leverage Pydantic or Msgspec validation capabilities to serialize and validate incoming messages

  • Automatic Docs: Stay ahead with automatic AsyncAPI documentation

  • Intuitive: Full-typed editor support makes your development experience smooth, catching errors before they reach runtime

  • Powerful Dependency Injection System: Manage your service dependencies efficiently with FastStream's built-in DI system

  • Testable: Supports in-memory tests, making your CI/CD pipeline faster and more reliable

  • Extensible: Use extensions for lifespans, custom serialization and middleware

  • Integrations: FastStream is fully compatible with any HTTP framework you want (FastAPI especially)

That's FastStream in a nutshell - easy, efficient, and powerful. Whether you're just starting with streaming microservices or looking to scale, FastStream has got you covered.


Documentation: https://faststream.ag2.ai/latest/


Versioning Policy

FastStream has a stable public API. Only major updates may introduce breaking changes.

Prior to FastStream's 1.0 release, each minor update is considered a major and can introduce breaking changes, but these changes were communicated through two-versions deprecation warnings prior to being fully removed. So features deprecated in the 0.4 version were only removed in version 0.6.

Our team is working toward the stable 1.0 version.

Installation

FastStream works on Linux, macOS, Windows and most Unix-style operating systems. You can install it with pip as usual:

pipinstall'faststream[kafka]'
# or
pipinstall'faststream[confluent]'
# or
pipinstall'faststream[rabbit]'
# or
pipinstall'faststream[nats]'
# or
pipinstall'faststream[redis]'
# or
pipinstall'faststream[mqtt]'

Writing app code

FastStream brokers provide convenient function decorators @broker.subscriber and @broker.publisher to allow you to delegate the actual process of:

  • consuming and producing data to Event queues, and

  • decoding and encoding JSON-encoded messages

These decorators make it easy to specify the processing logic for your consumers and producers, allowing you to focus on the core business logic of your application without worrying about the underlying integration.

Also, FastStream uses Pydantic to parse input JSON-encoded data into Python objects, making it easy to work with structured data in your applications, so you can serialize your input messages just using type annotations.

Here is an example Python app using FastStream that consumes data from an incoming data stream and outputs the data to another one:

fromfaststreamimport FastStream
fromfaststream.kafkaimport KafkaBroker
# from faststream.confluent import KafkaBroker
# from faststream.rabbit import RabbitBroker
# from faststream.nats import NatsBroker
# from faststream.redis import RedisBroker
# from faststream.mqtt import MQTTBroker

broker = KafkaBroker("localhost:9092")
# broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
# broker = NatsBroker("nats://localhost:4222/")
# broker = RedisBroker("redis://localhost:6379/")
# broker = MQTTBroker("localhost")

app = FastStream(broker)

@broker.subscriber("in")
@broker.publisher("out")
async defhandle_msg(user: str, user_id: int) -> str:
 return f"User: {user_id} - {user} registered"

Pydantic serialization

Also, Pydantic’s BaseModel class allows you to define messages using a declarative syntax, making it easy to specify the fields and types of your messages.

frompydanticimport BaseModel, Field, PositiveInt
fromfaststreamimport FastStream
fromfaststream.kafkaimport KafkaBroker

broker = KafkaBroker("localhost:9092")
app = FastStream(broker)

classUser(BaseModel):
 user: str = Field(..., examples=["John"])
 user_id: PositiveInt = Field(..., examples=["1"])

@broker.subscriber("in")
@broker.publisher("out")
async defhandle_msg(data: User) -> str:
 return f"User: {data.user} - {data.user_id} registered"

By default we use PydanticV2 written in Rust as serialization library, but you can downgrade it manually, if your platform has no Rust support - FastStream will work correctly with PydanticV1 as well.

To choose the Pydantic version, you can install the required one using the regular

pipinstallpydantic==1.X.Y

FastStream (and FastDepends inside) should work correctly with almost any version.

Msgspec serialization

Moreover, FastStream is not tied to any specific serialization library, so you can use any preferred one. Fortunately, we provide a built‑in alternative for the most popular Pydantic replacement - Msgspec.

fromfast_depends.msgspecimport MsgSpecSerializer
fromfaststream.kafkaimport KafkaBroker

broker = KafkaBroker(serializer=MsgSpecSerializer())

You can read more about the feature in the documentation.

Unified API

At first glance, FastStream unifies various broker backends under a single API. However, a completely unified API inevitably results in missing features. We do not want to limit users' choices. If you prefer Kafka over Redis, there is a reason. Therefore, we support all native broker features you need.

Consequently, our unified API has a relatively limited scope:

fromfaststream.[broker] import[Broker], [Broker]Message

broker = [Broker](*servers)

@broker.subscriber([source]) # Kafka topic / RMQ queue / NATS subject / MQTT topic / etc
@broker.publisher([destination]) # topic / routing key / subject / etc
async defhandler(msg: [Broker]Message) -> None:
 await msg.ack() # control brokers' acknowledgement policy

...

await broker.publish("Message", [destiination])

Beyond this scope you can use any broker-native features you need:

  • Kafka - specific partition reads, partitioner control, consumer groups, batch processing, etc.
  • RabbitMQ - all exchange types, Redis Streams, RPC, manual channel configuration, DLQ, etc.
  • NATS - core and Push/Pull JetStream subscribers, KeyValue, ObjectStorage, RPC, etc.
  • Redis - Pub/Sub, List, Stream subscribers, consumer groups, acknowledgements, etc.
  • MQTT - topic subscriptions (including wildcards), QoS and retain, MQTT 3.1.1 and 5.0, request/reply (RPC), TLS, etc.

You can find detailed information about all supported features in FastStream’s broker‑specific documentation.

If a particular feature is missing or not yet supported, you can always fall back to the native broker client/connection for those operations.


Testing the service

The service can be tested using the TestBroker context managers, which, by default, puts the Broker into "testing mode".

The Tester will redirect your subscriber and publisher decorated functions to the InMemory brokers, allowing you to quickly test your app without the need for a running broker and all its dependencies.

Using pytest, the test for our service would look like this:

importpytest
importpydantic
fromfaststream.kafkaimport TestKafkaBroker


@pytest.mark.asyncio
async deftest_correct():
 async with TestKafkaBroker(broker) as br:
 await br.publish({
 "user": "John",
 "user_id": 1,
 }, "in")

@pytest.mark.asyncio
async deftest_invalid():
 async with TestKafkaBroker(broker) as br:
 with pytest.raises(pydantic.ValidationError):
 await br.publish("wrong message", "in")

Running the application

The application can be started using built-in FastStream CLI command.

Before running the service, install FastStream CLI using the following command:

pipinstall"faststream[cli]"

To run the service, use the FastStream CLI command and pass the module (in this case, the file where the app implementation is located) and the app symbol to the command.

faststreamrunbasic:app

After running the command, you should see the following output:

INFO-FastStreamappstarting...
INFO-input_data|-`HandleMsg`waitingformessages
INFO-FastStreamappstartedsuccessfully!ToexitpressCTRL+C

Also, FastStream provides you with a great hot reload feature to improve your Development Experience

faststreamrunbasic:app--reload

And multiprocessing horizontal scaling feature as well:

faststreamrunbasic:app--workers3

You can learn more about CLI features here


Project Documentation

FastStream automatically generates documentation for your project according to the AsyncAPI specification. You can work with both generated artifacts and place a web view of your documentation on resources available to related teams.

The availability of such documentation significantly simplifies the integration of services: you can immediately see what channels and message formats the application works with. And most importantly, it won't cost anything - FastStream has already created the docs for you!

πŸ‘ HTML-page


Dependencies

FastStream (thanks to FastDepends) has a dependency management system similar to pytest fixtures and FastAPI Depends at the same time. Function arguments declare which dependencies you want are needed, and a special decorator delivers them from the global Context object.

fromtypingimport Annotated
fromfaststreamimport Depends, Logger

async defbase_dep(user_id: int) -> bool:
 return True

@broker.subscriber("in-test")
async defbase_handler(user: str,
 logger: Logger,
 dep: Annotated[bool, Depends(base_dep)]):
 assert dep is True
 logger.info(user)

HTTP Frameworks integrations

Any Framework

You can use FastStream MQBrokers without a FastStream application. Just start and stop them according to your application's lifespan.

fromaiohttpimport web

fromfaststream.kafkaimport KafkaBroker

broker = KafkaBroker("localhost:9092")

@broker.subscriber("test")
async defbase_handler(body):
 print(body)

async defstart_broker(app):
 await broker.start()

async defstop_broker(app):
 await broker.stop()

async defhello(request):
 return web.Response(text="Hello, world")

app = web.Application()
app.add_routes([web.get("/", hello)])
app.on_startup.append(start_broker)
app.on_cleanup.append(stop_broker)

if __name__ == "__main__":
 web.run_app(app)

FastAPI Plugin

Also, FastStream can be used as part of FastAPI.

Just import a StreamRouter you need and declare the message handler with the same @router.subscriber(...) and @router.publisher(...) decorators.

fromfastapiimport FastAPI
frompydanticimport BaseModel

fromfaststream.kafka.fastapiimport KafkaRouter

router = KafkaRouter("localhost:9092")

classIncoming(BaseModel):
 m: dict

@router.subscriber("test")
@router.publisher("response")
async defhello(m: Incoming):
 return {"response": "Hello, world!"}

app = FastAPI()
app.include_router(router)

More integration features can be found here


Benchmarks

We use codspeed to run benchmarks for both FastStream itself and raw clients.

Stay in touch

Please show your support and stay in touch by:

Your support helps us to stay in touch with you and encourages us to continue developing and improving the framework. Thank you for your support!


Contributors

Thanks to all of these amazing people who made the project better!

πŸ‘ Image

Project details

Verified details

These details have been verified by PyPI
Owner
Maintainers
πŸ‘ Avatar for lancetnik from gravatar.com
lancetnik

Unverified details

These details have not been verified by PyPI
Project links
Meta
  • License Expression: Apache-2.0
    SPDX License Expression
  • Author: AG2AI, Nikita Pastukhov
  • Tags rabbitmq , kafka , nats , redis , mqtt , asyncapi , framework , message brokers
  • Requires: Python >=3.10
  • Provides-Extra: cli , confluent , kafka , mqtt , nats , otel , prometheus , rabbit , redis

Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

faststream-0.7.1.tar.gz (329.5 kB view details)

Uploaded Source

Built Distribution

Filter files by name, interpreter, ABI, and platform.

If you're not sure about the file name format, learn more about wheel file names.

Copy a direct link to the current filters

faststream-0.7.1-py3-none-any.whl (556.5 kB view details)

Uploaded Python 3

File details

Details for the file faststream-0.7.1.tar.gz.

File metadata

  • Download URL: faststream-0.7.1.tar.gz
  • Upload date:
  • Size: 329.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.13

File hashes

Hashes for faststream-0.7.1.tar.gz
Algorithm Hash digest
SHA256 11816dd4ca678f6b173159c456619993b09684b684abe4010366d6a0c8059c58
MD5 c4b28bff10225dc093f22dbf928e219c
BLAKE2b-256 5a927f5edc7e1945968331c185ee197b690b413da861649d5859e3f377348acd

See more details on using hashes here.

File details

Details for the file faststream-0.7.1-py3-none-any.whl.

File metadata

  • Download URL: faststream-0.7.1-py3-none-any.whl
  • Upload date:
  • Size: 556.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.13

File hashes

Hashes for faststream-0.7.1-py3-none-any.whl
Algorithm Hash digest
SHA256 d4f971f6ba7621a05a67a8fcb4436fdcf89b5f21b33eb107f193b1c9553738a0
MD5 d1786342360642c322beccaaae7eb0d7
BLAKE2b-256 122e73d58856a305a73ac31d3d950f9b96a2ef4c1a6071e38b339d1b0b466418

See more details on using hashes here.

Supported by

πŸ‘ Image
AWS Cloud computing and Security Sponsor πŸ‘ Image
Datadog Monitoring πŸ‘ Image
Depot Continuous Integration πŸ‘ Image
Fastly CDN πŸ‘ Image
Google Download Analytics πŸ‘ Image
Pingdom Monitoring πŸ‘ Image
Sentry Error logging πŸ‘ Image
StatusPage Status page