Requests
Request/response is a commonly used message pattern where one service sends a request to another service, continuing after the response is received. In a distributed system, this can increase the latency of an application since the service may be hosted in another process, on another machine, or may even be a remote service in another network. While in many cases it is best to avoid request/response use in distributed applications, particularly when the request is a command, it is often necessary and preferred over more complex solutions.
In MassTransit, developers use a request client to send or publish requests and wait for a response. The request client is asynchronous and supports use of the await keyword since it returns a Task.
Create the request message contracts
Section titled “Create the request message contracts”To use the request client, create two message contracts, one for the request and one for the response.
publicrecordCheckOrderStatus{publicstring OrderId { get; init; }}publicrecordOrderStatusResult{publicstring OrderId { get; init; }public DateTime Timestamp { get; init; }publicshort StatusCode { get; init; }publicstring StatusText { get; init; }}Create a request consumer
Section titled “Create a request consumer”Request messages can be handled by any consumer type, including consumers, sagas, and routing slips. In this case, the consumer below consumes the CheckOrderStatus message and responds with the OrderStatusResult message.
publicclassCheckOrderStatusConsumer(IOrderRepository orderRepository) :IConsumer<CheckOrderStatus>{publicasync Task Consume(ConsumeContext<CheckOrderStatus> context){var order =awaitorderRepository.Get(context.Message.OrderId);if (order ==null)thrownew InvalidOperationException("Order not found");awaitcontext.RespondAsync<OrderStatusResult>(new{OrderId =order.Id,order.Timestamp,order.StatusCode,order.StatusText});}}If the OrderId is found in the repository, an OrderStatusResult message will be sent to the response address included with the request. The waiting request client will handle the response and complete the returned Task allowing the requesting application to continue.
If the OrderId was not found, the consumer throws an exception. MassTransit catches the exception, generates a Fault<CheckOrderStatus> message, and sends it
to the response address. The request client handles the fault message and throws a RequestFaultException via the awaited Task containing the exception
detail.
Reference the request client
Section titled “Reference the request client”To use the request client, add the request client as a dependency as shown in the example API controller below.
publicclassRequestController(IRequestClient<CheckOrderStatus> client) :Controller{[HttpGet("{orderId}")]publicasync Task<IActionResult> Get(string orderId, CancellationToken cancellationToken){var response =awaitclient.GetResponse<OrderStatusResult>(new { orderId }, cancellationToken);returnOk(response.Message);}}The controller method will send the request and return the order status after the response has been received.
If the cancellationToken passed to GetResponse is canceled, the request client will stop waiting for a response. However, the request message produced remains in the queue until it is consumed or the message time-to-live expires. By default, the message time-to-live is set to the request timeout (which defaults to 30 seconds).
Configure the request client (optional)
Section titled “Configure the request client (optional)”A request client can be resolved using dependency injection for any valid message type, no configuration is required. By default, request messages are published and should be consumed by only one consumer/receive endpoint connected to the message broker. Multiple consumers connected to the same Receive endpoint are fine, requests will be load balanced across the connected consumers.
To configure the request client for a message type, add the request client to the configuration explicitly.
services.AddMassTransit(x =>{// configure the consumer on a specific endpoint addressx.AddConsumer<CheckOrderStatusConsumer>().Endpoint(e =>e.Name="order-status");// Sends the request to the specified address, instead of publishing itx.AddRequestClient<CheckOrderStatus>(new Uri("exchange:order-status"));x.UsingInMemory((context, cfg) =>{cfg.ConfigureEndpoints(context);}));});Configure AutoStart on the bus
Section titled “Configure AutoStart on the bus”By default, the bus endpoint used by the request client is not started until immediately prior to the first request. This is intentional to reduce overhead when you’re not using the request client or similar features that rely on the bus endpoint.
To decrease the latency of the first request, you can configure the bus endpoint to start when the bus is started by specifying AutoStart = true when
configuring the bus as shown below.
services.AddMassTransit(x =>{x.UsingRabbitMq((context, cfg) =>{cfg.AutoStart=true; // default = false, starts the bus endpoint with the bus});});Set request headers
Section titled “Set request headers”To create a request and add a header to the SendContext, one option is to add an Execute filter to the request pipeline.
awaitclient.GetResponse<OrderStatusResult>(new GetOrderStatus{ OrderId = orderId },x =>x.UseExecute(context =>context.Headers.Set("tenant-id", "some-value")));Another option is to use the object values overload, which uses a message initializer, to specify the header value. Learn more about message initializers in the Concepts section.
awaitclient.GetResponse<OrderStatusResult>(new{orderId,__Header_Tenant_Id ="some-value"});Accept multiple response types
Section titled “Accept multiple response types”Another powerful feature with the request client is the ability to support multiple (such as positive and negative) result types. For example, adding an
OrderNotFound response type to the consumer as shown eliminates throwing an exception since a missing order isn’t really a fault.
publicclassCheckOrderStatusConsumer :IConsumer<CheckOrderStatus>{publicasync Task Consume(ConsumeContext<CheckOrderStatus> context){var order =await_orderRepository.Get(context.Message.OrderId);if (order ==null)awaitcontext.RespondAsync<OrderNotFound>(context.Message);elseawaitcontext.RespondAsync<OrderStatusResult>(new{OrderId =order.Id,order.Timestamp,order.StatusCode,order.StatusText});}}The client can now wait for multiple response types (in this case, two) by using a little tuple magic.
var response =awaitclient.GetResponse<OrderStatusResult, OrderNotFound>(new { OrderId = id});if (response.Is(out Response<OrderStatusResult> responseA)){// do something with the order}elseif (response.Is(out Response<OrderNotFound> responseB)){// the order was not found}This cleans up the processing, and eliminates the need to catch a RequestFaultException.
It’s also possible to use some of the switch expressions via deconstruction, but this requires the response variable to be explicitly specified as Response.
Response response =awaitclient.GetResponse<OrderStatusResult, OrderNotFound>(new { OrderId = id});// Using a regular switch statementswitch (response){case (_, OrderStatusResult a) responseA:// order foundbreak;case (_, OrderNotFound b) responseB:// order not foundbreak;}// Or using a switch expressionvar accepted = response switch{(_, OrderStatusResult a) =>true,(_, OrderNotFound b) =>false,_=>thrownew InvalidOperationException()};Check response types accepted
Section titled “Check response types accepted”The request client sets a message header, MT-Request-AcceptType, that contains the response types supported by the request client. This allows the request
consumer to determine if the client can handle a response type, which can be useful as services evolve and new response types may be added to handle new
conditions. For instance, if a consumer adds a new response type, such as OrderAlreadyShipped, if the response type isn’t supported an exception may be thrown
instead.
To see this in code, check out the client code:
var response =awaitclient.GetResponse<OrderCanceled, OrderNotFound>(new CancelOrder());if (response.Is(out Response<OrderCanceled> canceled)){returnOk();}elseif (response.Is(out Response<OrderNotFound> responseB)){returnNotFound();}The original consumer, before adding the new response type:
publicasync Task Consume(ConsumeContext<CancelOrder> context){var order =_repository.Load(context.Message.OrderId);if(order ==null){awaitcontext.ResponseAsync<OrderNotFound>(new { context.Message.OrderId });return;}order.Cancel();awaitcontext.RespondAsync<OrderCanceled>(new { context.Message.OrderId });}Now, the new consumer that checks if the order has already shipped:
publicasync Task Consume(ConsumeContext<CancelOrder> context){var order =_repository.Load(context.Message.OrderId);if(order ==null){awaitcontext.ResponseAsync<OrderNotFound>(new { context.Message.OrderId });return;}if(order.HasShipped){if (context.IsResponseAccepted<OrderAlreadyShipped>()){awaitcontext.RespondAsync<OrderAlreadyShipped>(new { context.Message.OrderId, order.ShipDate });return;}elsethrownew InvalidOperationException("The order has already shipped"); // to throw a RequestFaultException in the client}order.Cancel();awaitcontext.RespondAsync<OrderCanceled>(new { context.Message.OrderId });}This way, the consumer can check the request client response types and act accordingly.
Await multiple requests
Section titled “Await multiple requests”If there were multiple requests to be performed, it is easy to wait on all results at the same time, benefiting from the concurrent operation.
publicclassRequestController(IRequestClient<RequestA> clientA, IRequestClient<RequestB> clientB): Controller{publicasync Task<ActionResult> Get(){var resultA =clientA.GetResponse(new RequestA());var resultB =clientB.GetResponse(new RequestB());awaitTask.WhenAll(resultA, resultB);var a =await resultA;var b =await resultB;var model =new Model(a.Message, b.Message);returnView(model);}}The power of concurrency, for the win!
Request Handle
Section titled “Request Handle”Client factories or the request client can also be used to create a request instead of calling GetResponse. This is an uncommon scenario but is available as an option and may make sense depending on the situation. If a request is created (which returns a RequestHandle<T>), the request handle must be disposed after the request completes.
Using
Createreturns a request handle, which can be used to set headers and other attributes of the request before it is sent.
publicinterface IRequestClient<TRequest>where TRequest : class{RequestHandle<TRequest> Create(TRequest request, CancellationToken cancellationToken, RequestTimeout timeout);}For
RequestTimeoutthree options are available,None,Default, and a factory withRequestTimeout.After.Nonewould never be recommended since it would essentially wait forever for a response. There is always a relevant timeout, or you’re using the wrong pattern.
Request Client Factory
Section titled “Request Client Factory”The internals are documented for understanding, but what follows is optional reading. The above container-based configuration handles all the details to ensure the proper context is used.
The request client is composed of two parts, a client factory and a request client. There are two client factories, the scoped client factory, and the bus client factory.
Use IScopedClientFactory to create a request client
Section titled “Use IScopedClientFactory to create a request client”Using IRequestClient requires a container scope, and the request client for a request message type is resolved from container scope using a scoped client
factory. As an alternative to specifying IRequestClient<T> as a constructor dependency, the scoped client factory can be used instead of create a request
client directly. This can be useful when the destination address may change based on context, such as a TenantId.
publicinterface IScopedClientFactory{IRequestClient<T> CreateRequestClient<T>(RequestTimeout timeout =default)where T : class;IRequestClient<T> CreateRequestClient<T>(Uri destinationAddress, RequestTimeout timeout =default)where T : class;}An example showing how to use IScopedClientFactory is shown below.
[HttpGet]publicasync Task<IActionResult> HandleGet(string tenantId, int id, [FromServices] IScopedClientFactory clientFactory){var serviceAddress =new Uri($"exchange:check-order-status-{tenantId}");var client =clientFactory.CreateRequestClient<CheckOrderStatus>(serviceAddress);var response =awaitclient.GetResponse<OrderStatusResult>(new { OrderId = id});returnOk();}Use IClientFactory to create a request client
Section titled “Use IClientFactory to create a request client”If there is no container scope available, and one cannot be created, the root client factory can be used instead. Note that non-scoped interfaces are not compatible with scoped publish or send filters.
publicinterface IClientFactory{IRequestClient<T> CreateRequestClient<T>(ConsumeContext context, Uri destinationAddress, RequestTimeout timeout);IRequestClient<T> CreateRequestClient<T>(Uri destinationAddress, RequestTimeout timeout);}An example showing how to use IClientFactory is shown below.
publicasync Task WorkerMethod(IServiceProvider provider){var clientFactory =provider.GetRequiredService<IClientFactory>();var serviceAddress =new Uri("exchange:check-order-status");var client =clientFactory.CreateRequestClient<CheckOrderStatus>(serviceAddress);var response =awaitclient.GetResponse<OrderStatusResult>(new { OrderId = id});}