faststream 0.7.1
pip install faststream
Released:
FastStream: the simplest way to work with a messaging queues
Navigation
Verified details
These details have been verified by PyPIOwner
Maintainers
π Avatar for lancetnik from gravatar.comlancetnik
Unverified details
These details have not been verified by PyPIProject links
Meta
-
License Expression: Apache-2.0
SPDX License Expression - Author: AG2AI, Nikita Pastukhov
- Tags rabbitmq , kafka , nats , redis , mqtt , asyncapi , framework , message brokers
- Requires: Python >=3.10
-
Provides-Extra:
cli,confluent,kafka,mqtt,nats,otel,prometheus,rabbit,redis
Classifiers
- Development Status
- Environment
- Framework
- Intended Audience
- License
- Operating System
- Programming Language
- Topic
- Typing
Project description
FastStream
FastStream is an asynchronous Python framework for building event-driven applications. It brings together message broker integration, dependency injection, validation, testing utilities, and AsyncAPI documentation generation in a single toolkit, reducing boilerplate without hiding the capabilities of the underlying broker.
π Test Passing
π Coverage
π Downloads
π Package version
π Supported Python versions
π CodeQL
π Dependency Review
π License
π Code of Conduct
π Discord
π FastStream
π Telegram
π Gurubase
Features
FastStream simplifies the process of writing producers and consumers for message queues, handling all the parsing, networking and documentation generation automatically.
Making streaming microservices has never been easier. Designed with junior developers in mind, FastStream simplifies your work while keeping the door open for more advanced use cases. Here's a look at the core features that make FastStream a go-to framework for modern, data-centric microservices.
-
Multiple Brokers: FastStream provides a suitable API to work across multiple message brokers (Kafka, RabbitMQ, NATS, Redis, MQTT support)
-
Built-in Serialization: Leverage Pydantic or Msgspec validation capabilities to serialize and validate incoming messages
-
Automatic Docs: Stay ahead with automatic AsyncAPI documentation
-
Intuitive: Full-typed editor support makes your development experience smooth, catching errors before they reach runtime
-
Powerful Dependency Injection System: Manage your service dependencies efficiently with FastStream's built-in DI system
-
Testable: Supports in-memory tests, making your CI/CD pipeline faster and more reliable
-
Extensible: Use extensions for lifespans, custom serialization and middleware
-
Integrations: FastStream is fully compatible with any HTTP framework you want (FastAPI especially)
That's FastStream in a nutshell - easy, efficient, and powerful. Whether you're just starting with streaming microservices or looking to scale, FastStream has got you covered.
Documentation: https://faststream.ag2.ai/latest/
Versioning Policy
FastStream has a stable public API. Only major updates may introduce breaking changes.
Prior to FastStream's 1.0 release, each minor update is considered a major and can introduce breaking changes, but these changes were communicated through two-versions deprecation warnings prior to being fully removed. So features deprecated in the 0.4 version were only removed in version 0.6.
Our team is working toward the stable 1.0 version.
Installation
FastStream works on Linux, macOS, Windows and most Unix-style operating systems.
You can install it with pip as usual:
pipinstall'faststream[kafka]' # or pipinstall'faststream[confluent]' # or pipinstall'faststream[rabbit]' # or pipinstall'faststream[nats]' # or pipinstall'faststream[redis]' # or pipinstall'faststream[mqtt]'
Writing app code
FastStream brokers provide convenient function decorators @broker.subscriber
and @broker.publisher to allow you to delegate the actual process of:
-
consuming and producing data to Event queues, and
-
decoding and encoding JSON-encoded messages
These decorators make it easy to specify the processing logic for your consumers and producers, allowing you to focus on the core business logic of your application without worrying about the underlying integration.
Also, FastStream uses Pydantic to parse input JSON-encoded data into Python objects, making it easy to work with structured data in your applications, so you can serialize your input messages just using type annotations.
Here is an example Python app using FastStream that consumes data from an incoming data stream and outputs the data to another one:
fromfaststreamimport FastStream fromfaststream.kafkaimport KafkaBroker # from faststream.confluent import KafkaBroker # from faststream.rabbit import RabbitBroker # from faststream.nats import NatsBroker # from faststream.redis import RedisBroker # from faststream.mqtt import MQTTBroker broker = KafkaBroker("localhost:9092") # broker = RabbitBroker("amqp://guest:guest@localhost:5672/") # broker = NatsBroker("nats://localhost:4222/") # broker = RedisBroker("redis://localhost:6379/") # broker = MQTTBroker("localhost") app = FastStream(broker) @broker.subscriber("in") @broker.publisher("out") async defhandle_msg(user: str, user_id: int) -> str: return f"User: {user_id} - {user} registered"
Pydantic serialization
Also, Pydanticβs BaseModel class allows you
to define messages using a declarative syntax, making it easy to specify the fields and types of your messages.
frompydanticimport BaseModel, Field, PositiveInt fromfaststreamimport FastStream fromfaststream.kafkaimport KafkaBroker broker = KafkaBroker("localhost:9092") app = FastStream(broker) classUser(BaseModel): user: str = Field(..., examples=["John"]) user_id: PositiveInt = Field(..., examples=["1"]) @broker.subscriber("in") @broker.publisher("out") async defhandle_msg(data: User) -> str: return f"User: {data.user} - {data.user_id} registered"
By default we use PydanticV2 written in Rust as serialization library, but you can downgrade it manually, if your platform has no Rust support - FastStream will work correctly with PydanticV1 as well.
To choose the Pydantic version, you can install the required one using the regular
pipinstallpydantic==1.X.Y
FastStream (and FastDepends inside) should work correctly with almost any version.
Msgspec serialization
Moreover, FastStream is not tied to any specific serialization library, so you can use any preferred one. Fortunately, we provide a builtβin alternative for the most popular Pydantic replacement - Msgspec.
fromfast_depends.msgspecimport MsgSpecSerializer fromfaststream.kafkaimport KafkaBroker broker = KafkaBroker(serializer=MsgSpecSerializer())
You can read more about the feature in the documentation.
Unified API
At first glance, FastStream unifies various broker backends under a single API. However, a completely unified API inevitably results in missing features. We do not want to limit users' choices. If you prefer Kafka over Redis, there is a reason. Therefore, we support all native broker features you need.
Consequently, our unified API has a relatively limited scope:
fromfaststream.[broker] import[Broker], [Broker]Message broker = [Broker](*servers) @broker.subscriber([source]) # Kafka topic / RMQ queue / NATS subject / MQTT topic / etc @broker.publisher([destination]) # topic / routing key / subject / etc async defhandler(msg: [Broker]Message) -> None: await msg.ack() # control brokers' acknowledgement policy ... await broker.publish("Message", [destiination])
Beyond this scope you can use any broker-native features you need:
- Kafka - specific partition reads, partitioner control, consumer groups, batch processing, etc.
- RabbitMQ - all exchange types, Redis Streams, RPC, manual channel configuration, DLQ, etc.
- NATS - core and Push/Pull JetStream subscribers, KeyValue, ObjectStorage, RPC, etc.
- Redis - Pub/Sub, List, Stream subscribers, consumer groups, acknowledgements, etc.
- MQTT - topic subscriptions (including wildcards), QoS and retain, MQTT 3.1.1 and 5.0, request/reply (RPC), TLS, etc.
You can find detailed information about all supported features in FastStreamβs brokerβspecific documentation.
If a particular feature is missing or not yet supported, you can always fall back to the native broker client/connection for those operations.
Testing the service
The service can be tested using the TestBroker context managers, which, by default, puts the Broker into "testing mode".
The Tester will redirect your subscriber and publisher decorated functions to the InMemory brokers, allowing you to quickly test your app without the need for a running broker and all its dependencies.
Using pytest, the test for our service would look like this:
importpytest importpydantic fromfaststream.kafkaimport TestKafkaBroker @pytest.mark.asyncio async deftest_correct(): async with TestKafkaBroker(broker) as br: await br.publish({ "user": "John", "user_id": 1, }, "in") @pytest.mark.asyncio async deftest_invalid(): async with TestKafkaBroker(broker) as br: with pytest.raises(pydantic.ValidationError): await br.publish("wrong message", "in")
Running the application
The application can be started using built-in FastStream CLI command.
Before running the service, install FastStream CLI using the following command:
pipinstall"faststream[cli]"
To run the service, use the FastStream CLI command and pass the module (in this case, the file where the app implementation is located) and the app symbol to the command.
faststreamrunbasic:app
After running the command, you should see the following output:
INFO-FastStreamappstarting... INFO-input_data|-`HandleMsg`waitingformessages INFO-FastStreamappstartedsuccessfully!ToexitpressCTRL+C
Also, FastStream provides you with a great hot reload feature to improve your Development Experience
faststreamrunbasic:app--reload
And multiprocessing horizontal scaling feature as well:
faststreamrunbasic:app--workers3
You can learn more about CLI features here
Project Documentation
FastStream automatically generates documentation for your project according to the AsyncAPI specification. You can work with both generated artifacts and place a web view of your documentation on resources available to related teams.
The availability of such documentation significantly simplifies the integration of services: you can immediately see what channels and message formats the application works with. And most importantly, it won't cost anything - FastStream has already created the docs for you!
Dependencies
FastStream (thanks to FastDepends) has a dependency management system similar to pytest fixtures and FastAPI Depends at the same time. Function arguments declare which dependencies you want are needed, and a special decorator delivers them from the global Context object.
fromtypingimport Annotated fromfaststreamimport Depends, Logger async defbase_dep(user_id: int) -> bool: return True @broker.subscriber("in-test") async defbase_handler(user: str, logger: Logger, dep: Annotated[bool, Depends(base_dep)]): assert dep is True logger.info(user)
HTTP Frameworks integrations
Any Framework
You can use FastStream MQBrokers without a FastStream application.
Just start and stop them according to your application's lifespan.
fromaiohttpimport web fromfaststream.kafkaimport KafkaBroker broker = KafkaBroker("localhost:9092") @broker.subscriber("test") async defbase_handler(body): print(body) async defstart_broker(app): await broker.start() async defstop_broker(app): await broker.stop() async defhello(request): return web.Response(text="Hello, world") app = web.Application() app.add_routes([web.get("/", hello)]) app.on_startup.append(start_broker) app.on_cleanup.append(stop_broker) if __name__ == "__main__": web.run_app(app)
FastAPI Plugin
Also, FastStream can be used as part of FastAPI.
Just import a StreamRouter you need and declare the message handler with the same @router.subscriber(...) and @router.publisher(...) decorators.
fromfastapiimport FastAPI frompydanticimport BaseModel fromfaststream.kafka.fastapiimport KafkaRouter router = KafkaRouter("localhost:9092") classIncoming(BaseModel): m: dict @router.subscriber("test") @router.publisher("response") async defhello(m: Incoming): return {"response": "Hello, world!"} app = FastAPI() app.include_router(router)
More integration features can be found here
Benchmarks
We use codspeed to run benchmarks for both FastStream itself and raw clients.
Stay in touch
Please show your support and stay in touch by:
-
giving our GitHub repository a star, and
-
joining our EN Discord server
-
joining our RU Telegram group
Your support helps us to stay in touch with you and encourages us to continue developing and improving the framework. Thank you for your support!
Contributors
Thanks to all of these amazing people who made the project better!
π ImageProject details
Verified details
These details have been verified by PyPIOwner
Maintainers
π Avatar for lancetnik from gravatar.comlancetnik
Unverified details
These details have not been verified by PyPIProject links
Meta
-
License Expression: Apache-2.0
SPDX License Expression - Author: AG2AI, Nikita Pastukhov
- Tags rabbitmq , kafka , nats , redis , mqtt , asyncapi , framework , message brokers
- Requires: Python >=3.10
-
Provides-Extra:
cli,confluent,kafka,mqtt,nats,otel,prometheus,rabbit,redis
Classifiers
- Development Status
- Environment
- Framework
- Intended Audience
- License
- Operating System
- Programming Language
- Topic
- Typing
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file faststream-0.7.1.tar.gz.
File metadata
- Download URL: faststream-0.7.1.tar.gz
- Upload date:
- Size: 329.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
11816dd4ca678f6b173159c456619993b09684b684abe4010366d6a0c8059c58
|
|
| MD5 |
c4b28bff10225dc093f22dbf928e219c
|
|
| BLAKE2b-256 |
5a927f5edc7e1945968331c185ee197b690b413da861649d5859e3f377348acd
|
File details
Details for the file faststream-0.7.1-py3-none-any.whl.
File metadata
- Download URL: faststream-0.7.1-py3-none-any.whl
- Upload date:
- Size: 556.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d4f971f6ba7621a05a67a8fcb4436fdcf89b5f21b33eb107f193b1c9553738a0
|
|
| MD5 |
d1786342360642c322beccaaae7eb0d7
|
|
| BLAKE2b-256 |
122e73d58856a305a73ac31d3d950f9b96a2ef4c1a6071e38b339d1b0b466418
|
