In this post, we're going to look at a point where the vast majority of "old school" developers tend to struggle or oppose, since these types of developers have a mindset that's quite different when it comes to structuring applications.
Table of Contents
This post is part of the Distribt course, but we could also include it in a subgroup alongside the CQRS and event sourcing patterns, since they're often discussed together without stopping to think where each pattern truly belongs.
To follow along with the implementation, I recommend having completed the course, so you have the necessary context (it's possible to follow without it, but it helps).
1 - What is eventual consistency?
As the name suggests, eventual consistency refers to the fact that information throughout the system will become consistent eventually.
1.1 - But what does eventually mean?
When working with microservices or distributed applications, we design them so that only the corresponding domain code will modify or access the data within its own system.
For example, if we have a products table, only the product service can access it. It is strictly forbidden for outside services, such as purchase orders, to access the database directly.
And here is where most "old school" developers feel uneasy, because we can’t access the database directly, as is common in monolithic systems, we must provide alternative solutions. This often means writing more code.
A - API Calls
The first alternative is to make an API call whenever we need such information. For example, you store the product ID in orders, and if you want to return the product’s name through the API, you need to call the products API every time there’s a product in an order.
As you can imagine, in this particular case, this solution doesn’t help us much, since we would make thousands of calls to the product microservice unnecessarily.
We use this scenario when we need to have always up-to-date information.
B - Asynchronous Communication for Eventual Consistency
The second option is to use asynchronous communication via the producer-consumer pattern.
Basically, once we have created or updated our record in the correct database, we generate an integration event that will be listened to by all systems that need this information.
It’s the same logic we saw in the episode about the CQRS pattern, but now we're dealing with events that go beyond our domain, called integration events.
For instance, when we update a product’s name, we do that in the product service, and from there we generate an integration event that we publish on the service bus (in our case, RabbitMQ), which will be listened to by other services. In our case, the order service, which will update the record in the internal database of its domain.
- NOTE: Don’t worry about duplicating information, this is the price to pay, and it’s fully accepted (there is no other really effective way to do it).
Keep in mind that using asynchronous communication usually adds some delay before all systems in the architecture have the updated information. That’s where the term “eventual consistency” comes from.
In normal circumstances, this delay is just a matter of seconds at most, so in 99.9% of cases this method is preferable to using API calls, at least in my personal experience.
2 - Implementing Eventual Consistency in Microservices
As I mentioned before, when we discussed the post on CQRS, we dealt with eventual consistency between the write and read databases.
We can apply the very same logic here, but for generating integration events. That means, events that connect microservices with other microservices.
To do this, once we've updated the read database, we need to be sure that we generate our integration event.
public class ProductUpdatedHandler : IDomainMessageHandler<ProductUpdated>
{
private readonly IIntegrationMessagePublisher _integrationMessagePublisher;
public async Task Handle(DomainMessage<ProductUpdated> message, CancellationToken cancelToken = default(CancellationToken))
{
// other code
await _integrationMessagePublisher.Publish(
new ProductUpdated(message.Content.ProductId, message.Content.Details), routingKey:"external", cancellationToken: cancelToken);
}
}
This will be listened to by an IntegrationConsumer
within the Orders
service, which will update the table. By the way, the information we keep in the orders microservice is the current, latest version, unless we’re using bi-temporal data modeling. In simple terms, we update the data.
2.1 - Initial Data Load of Dependencies
At some point in our application's lifecycle, we’ll need to make an initial data load from another microservice.
Imagine the following scenario: you've loaded several hundred elements because the Products service is already in production, and a couple of weeks later you create the Orders service. Therefore, you’ll need to “initially load” all these products so you can display their names.
We have several options for this:
A - Initial Load
When we put our service into production, we perform an initial data load. For this, we make one or more calls to the products microservice to read all products and store the information in our database.
This load only happens once as the database isn’t (typically) deleted, of course.
And once everything is loaded, we start using new integration events.
B - Use Already Generated Integration Events
We can configure our service bus so that when a new consumer joins, all previously generated events are also propagated. This option often requires some extra setup on the queues and in the case of RabbitMQ
on the exchanges
.
C - Load Data on the Fly
Initially, we don’t perform any data load and we simply listen to integration events as usual. However, if we want to read an item that doesn’t exist, we call the products API to fetch the information.
Which Option to Choose?
Each option has its pros and cons, and it's up to you to find the one that best fits your system or preferences.
Personally, I like option C best, and also combine it with a cache, whether in memory or using Redis.
2.2 - Eventual Consistency Implementation in C#
As I hinted before, we're going to carry out a "passive" load. That is, we'll load the data only when we first need it and store it in the cache. Of course, the consumer will keep reading events constantly.
First, we need to create the consumer, which consumes the ProductUpdated
type:
public class ProductModifierHandler : IIntegrationMessageHandler<ProductUpdated>
{
private readonly IProductNameService _productNameService;
public ProductModifierHandler(IProductNameService productNameService)
{
_productNameService = productNameService;
}
public async Task Handle(IntegrationMessage<ProductUpdated> message, CancellationToken cancelToken = default(CancellationToken))
{
await _productNameService.SetProductName(message.Content.ProductId, message.Content.Details.Name, cancelToken);
}
}
And in the service is where we do most of the logic, inserting it into the cache and the database:
public interface IProductNameService
{
Task<string> GetProductName(int id, CancellationToken cancellationToken = default(CancellationToken));
Task SetProductName(int id, string name, CancellationToken cancellationToken = default(CancellationToken));
}
public class ProductNameService : IProductNameService
{
private readonly IProductRepository _productRepository;
private readonly IDistributedCache _cache;
private readonly IHttpClientFactory _httpClientFactory;
private readonly IServiceDiscovery _discovery;
public ProductNameService(IProductRepository productRepository, IDistributedCache cache,
IHttpClientFactory httpClientFactory, IServiceDiscovery discovery)
{
_productRepository = productRepository;
_cache = cache;
_httpClientFactory = httpClientFactory;
_discovery = discovery;
}
public async Task<string> GetProductName(int id, CancellationToken cancellationToken = default(CancellationToken))
{
string value = await _cache.GetStringAsync($"ORDERS-PRODUCT::{id}", cancellationToken);
if (value!=null)
{
return value;
}
string productName = await RetrieveProductName(id, cancellationToken);
return productName;
}
public async Task SetProductName(int id, string name,
CancellationToken cancellationToken = default(CancellationToken))
{
await _productRepository.UpsertProductName(id, name, cancellationToken);
await _cache.SetStringAsync($"ORDERS-PRODUCT::{id}", name, cancellationToken);
}
private async Task<string> RetrieveProductName(int id, CancellationToken cancellationToken)
{
string? productName = await _productRepository.GetProductName(id, cancellationToken);
if (productName == null)
{
FullProductResponse product = await GetProduct(id, cancellationToken);
await SetProductName(id, product.Details.Name, cancellationToken);
productName = product.Details.Name;
}
return productName;
}
private async Task<FullProductResponse> GetProduct(int productId, CancellationToken cancellationToken = default(CancellationToken))
{
HttpClient client = _httpClientFactory.CreateClient();
string productsReadApi =
await _discovery.GetFullAddress(DiscoveryServices.Microservices.ProductsApi.ApiRead, cancellationToken);
client.BaseAddress = new Uri(productsReadApi);
//TODO: replace exception
return await client.GetFromJsonAsync<FullProductResponse>($"product/{productId}", cancellationToken) ??
throw new ArgumentException("Product does not exist");
}
}
As you can see, we're using the IServiceDiscovery
interface, which registers the different services in our application automatically.
Earlier I mentioned that copying data is acceptable, but of course, we shouldn’t copy more than necessary. In this case, we’ll only need the name
, so the table will be id-name
, no more data than that.
We can use any database, be it NoSQL or SQL. In this example, we’ll use NoSQL since the rest of the microservice uses NoSQL as well. Also, remember to update the cache if needed.
Don't forget to listen to the corresponding queue in the consumer and link them in the RabbitMQ
configuration:
{
"queues": [
{"name":"order-domain-queue","vhost":"/","durable":true,"auto_delete":false,"arguments":{}},
{"name":"order-domain-queue.dead-letter","vhost":"/","durable":true,"auto_delete":false,"arguments":{}},
{"name":"order-queue","vhost":"/","durable":true,"auto_delete":false,"arguments":{}},
{"name":"order-queue.dead-letter","vhost":"/","durable":true,"auto_delete":false,"arguments":{}},
],
"bindings": [
{"source":"order.exchange","vhost":"/","destination":"order-domain-queue","destination_type":"queue","routing_key":"order","arguments":{}},
{"source":"order.exchange","vhost":"/","destination":"order-queue","destination_type":"queue","routing_key":"external","arguments":{}},
{"source":"products.exchange","vhost":"/","destination":"order.exchange","destination_type":"exchange","routing_key":"external","arguments":{}}
]
}
With these changes, we ensure that a message published from the product microservice with the "external" routing key will reach our Orders microservice to be processed.
Finally, all that’s left is to modify the GetOrder
service so it reads from the table residing solely in our microservice via IProductNameService
:
public interface IGetOrderService
{
Task<OrderResponse> Execute(Guid orderId, CancellationToken cancellationToken = default(CancellationToken));
}
public class GetOrderService : IGetOrderService
{
private readonly IOrderRepository _orderRepository;
private readonly IProductNameService _productNameService;//<- This is the change
public GetOrderService(IOrderRepository orderRepository, IProductNameService productNameService)
{
_orderRepository = orderRepository;
_productNameService = productNameService;
}
public async Task<OrderResponse> Execute(Guid orderId,
CancellationToken cancellationToken = default(CancellationToken))
{
OrderDetails orderDetails = await _orderRepository.GetById(orderId, cancellationToken);
//on a real scenario this implementation will be much bigger.
//SelectAsync is a custom method, go to the source code to check it out
var products = await orderDetails.Products
.SelectAsync(async p => new ProductQuantityName(p.ProductId, p.Quantity,
await _productNameService.GetProductName(p.ProductId, cancellationToken)));
return new OrderResponse(orderDetails.Id, orderDetails.Status.ToString(), orderDetails.Delivery,
orderDetails.PaymentInformation, products.ToList());
}
}
Now, to do a manual test, we need to run at least the Products read API and the Orders API. But for a complete manual test, you should run the following projects:
- Distribt.Services.Orders
- Distribt.Services.Consumer
- Distribt.Services.Products.Api.Read
- Distribt.Services.Products.Api.Write
- Distribt.Services.Products.Consumer
Here’s what we do: test that everything works correctly by creating a purchase order with a product with ID 1
. When doing GetOrder
, we get something like this:
Now, if we update that product in the products microservice, eventually (a second or two), the name will be updated in the Orders microservice and if we do another GetOrder
we can see the updated name.
Conclusion
In this post, we’ve discussed what eventual consistency is
How using eventual consistency affects your architecture
How to implement eventual consistency in code
If there is any problem you can add a comment bellow or contact me in the website's contact form