Eventual Consistency in Microservices

In this post, we're going to look at a point where the vast majority of "old school" developers tend to struggle or oppose, since these types of developers have a mindset that's quite different when it comes to structuring applications.

 

 

This post is part of the Distribt course, but we could also include it in a subgroup alongside the CQRS and event sourcing patterns, since they're often discussed together without stopping to think where each pattern truly belongs.

To follow along with the implementation, I recommend having completed the course, so you have the necessary context (it's possible to follow without it, but it helps).

 

 

1 - What is eventual consistency?

As the name suggests, eventual consistency refers to the fact that information throughout the system will become consistent eventually.

 

1.1 - But what does eventually mean?

When working with microservices or distributed applications, we design them so that only the corresponding domain code will modify or access the data within its own system.

 

For example, if we have a products table, only the product service can access it. It is strictly forbidden for outside services, such as purchase orders, to access the database directly.

microservices connection between services and dbs

And here is where most "old school" developers feel uneasy, because we can’t access the database directly, as is common in monolithic systems, we must provide alternative solutions. This often means writing more code.

 

A - API Calls

The first alternative is to make an API call whenever we need such information. For example, you store the product ID in orders, and if you want to return the product’s name through the API, you need to call the products API every time there’s a product in an order.

API call between services

As you can imagine, in this particular case, this solution doesn’t help us much, since we would make thousands of calls to the product microservice unnecessarily.

 

We use this scenario when we need to have always up-to-date information.

 

B - Asynchronous Communication for Eventual Consistency

The second option is to use asynchronous communication via the producer-consumer pattern.

 

Basically, once we have created or updated our record in the correct database, we generate an integration event that will be listened to by all systems that need this information.

 

It’s the same logic we saw in the episode about the CQRS pattern, but now we're dealing with events that go beyond our domain, called integration events.

 

For instance, when we update a product’s name, we do that in the product service, and from there we generate an integration event that we publish on the service bus (in our case, RabbitMQ), which will be listened to by other services. In our case, the order service, which will update the record in the internal database of its domain.

  • NOTE: Don’t worry about duplicating information, this is the price to pay, and it’s fully accepted (there is no other really effective way to do it).

Eventual consistency with asynchronous communication

Keep in mind that using asynchronous communication usually adds some delay before all systems in the architecture have the updated information. That’s where the term “eventual consistency” comes from.

 

In normal circumstances, this delay is just a matter of seconds at most, so in 99.9% of cases this method is preferable to using API calls, at least in my personal experience.

 

 

2 - Implementing Eventual Consistency in Microservices

As I mentioned before, when we discussed the post on CQRS, we dealt with eventual consistency between the write and read databases.

 

We can apply the very same logic here, but for generating integration events. That means, events that connect microservices with other microservices.

 

To do this, once we've updated the read database, we need to be sure that we generate our integration event.

public class ProductUpdatedHandler : IDomainMessageHandler<ProductUpdated>
{
    private readonly IIntegrationMessagePublisher _integrationMessagePublisher;

    public async Task Handle(DomainMessage<ProductUpdated> message, CancellationToken cancelToken = default(CancellationToken))
    {

     // other code

   
        await _integrationMessagePublisher.Publish(
            new ProductUpdated(message.Content.ProductId, message.Content.Details), routingKey:"external", cancellationToken: cancelToken);
    }
}

This will be listened to by an IntegrationConsumer within the Orders service, which will update the table. By the way, the information we keep in the orders microservice is the current, latest version, unless we’re using bi-temporal data modeling. In simple terms, we update the data.

 

2.1 - Initial Data Load of Dependencies

At some point in our application's lifecycle, we’ll need to make an initial data load from another microservice.

 

Imagine the following scenario: you've loaded several hundred elements because the Products service is already in production, and a couple of weeks later you create the Orders service. Therefore, you’ll need to “initially load” all these products so you can display their names.

 

We have several options for this:

 

A - Initial Load

When we put our service into production, we perform an initial data load. For this, we make one or more calls to the products microservice to read all products and store the information in our database.

 

This load only happens once as the database isn’t (typically) deleted, of course.

And once everything is loaded, we start using new integration events.

 

B - Use Already Generated Integration Events

We can configure our service bus so that when a new consumer joins, all previously generated events are also propagated. This option often requires some extra setup on the queues and in the case of RabbitMQ on the exchanges.

 

C - Load Data on the Fly

Initially, we don’t perform any data load and we simply listen to integration events as usual. However, if we want to read an item that doesn’t exist, we call the products API to fetch the information.

 

Which Option to Choose?

Each option has its pros and cons, and it's up to you to find the one that best fits your system or preferences.

Personally, I like option C best, and also combine it with a cache, whether in memory or using Redis.

 

 

2.2 - Eventual Consistency Implementation in C#

As I hinted before, we're going to carry out a "passive" load. That is, we'll load the data only when we first need it and store it in the cache. Of course, the consumer will keep reading events constantly.

 

First, we need to create the consumer, which consumes the ProductUpdated type:

public class ProductModifierHandler : IIntegrationMessageHandler<ProductUpdated>
{
    private readonly IProductNameService _productNameService;

    public ProductModifierHandler(IProductNameService productNameService)
    {
        _productNameService = productNameService;
    }

    public async Task Handle(IntegrationMessage<ProductUpdated> message, CancellationToken cancelToken = default(CancellationToken))
    {
        await _productNameService.SetProductName(message.Content.ProductId, message.Content.Details.Name, cancelToken);
    }
}

And in the service is where we do most of the logic, inserting it into the cache and the database:

public interface IProductNameService
{
    Task<string> GetProductName(int id, CancellationToken cancellationToken = default(CancellationToken));
    Task SetProductName(int id, string name, CancellationToken cancellationToken = default(CancellationToken));
}

public class ProductNameService : IProductNameService
{
    private readonly IProductRepository _productRepository;
    private readonly IDistributedCache _cache;
    private readonly IHttpClientFactory _httpClientFactory;
    private readonly IServiceDiscovery _discovery;

    public ProductNameService(IProductRepository productRepository, IDistributedCache cache,
        IHttpClientFactory httpClientFactory, IServiceDiscovery discovery)
    {
        _productRepository = productRepository;
        _cache = cache;
        _httpClientFactory = httpClientFactory;
        _discovery = discovery;
    }


    public async Task<string> GetProductName(int id, CancellationToken cancellationToken = default(CancellationToken))
    {
        string value = await _cache.GetStringAsync($"ORDERS-PRODUCT::{id}", cancellationToken);
        if (value!=null)
        {
            return value;
        }
        string productName = await RetrieveProductName(id, cancellationToken);

        return productName;
    }

    public async Task SetProductName(int id, string name,
        CancellationToken cancellationToken = default(CancellationToken))
    {
        await _productRepository.UpsertProductName(id, name, cancellationToken);
        await _cache.SetStringAsync($"ORDERS-PRODUCT::{id}", name, cancellationToken);
    }
   
    
    private async Task<string> RetrieveProductName(int id, CancellationToken cancellationToken)
    {
        string? productName = await _productRepository.GetProductName(id, cancellationToken);

        if (productName == null)
        {
            FullProductResponse product = await GetProduct(id, cancellationToken);
            await SetProductName(id, product.Details.Name, cancellationToken);
            productName = product.Details.Name;
        }

        return productName;
    }
    
    private async Task<FullProductResponse> GetProduct(int productId, CancellationToken cancellationToken = default(CancellationToken))
    {
        HttpClient client = _httpClientFactory.CreateClient();
        string productsReadApi =
            await _discovery.GetFullAddress(DiscoveryServices.Microservices.ProductsApi.ApiRead, cancellationToken);
        client.BaseAddress = new Uri(productsReadApi);

        //TODO: replace exception
        return await client.GetFromJsonAsync<FullProductResponse>($"product/{productId}", cancellationToken) ?? 
               throw  new ArgumentException("Product does not exist");
    }
}

As you can see, we're using the IServiceDiscovery interface, which registers the different services in our application automatically.

 

 

Earlier I mentioned that copying data is acceptable, but of course, we shouldn’t copy more than necessary. In this case, we’ll only need the name, so the table will be id-name, no more data than that.

 

We can use any database, be it NoSQL or SQL. In this example, we’ll use NoSQL since the rest of the microservice uses NoSQL as well. Also, remember to update the cache if needed.

 

Don't forget to listen to the corresponding queue in the consumer and link them in the RabbitMQ configuration:

{
  "queues": [
    {"name":"order-domain-queue","vhost":"/","durable":true,"auto_delete":false,"arguments":{}},
    {"name":"order-domain-queue.dead-letter","vhost":"/","durable":true,"auto_delete":false,"arguments":{}},
    {"name":"order-queue","vhost":"/","durable":true,"auto_delete":false,"arguments":{}},
    {"name":"order-queue.dead-letter","vhost":"/","durable":true,"auto_delete":false,"arguments":{}},
  ],
  "bindings": [
    {"source":"order.exchange","vhost":"/","destination":"order-domain-queue","destination_type":"queue","routing_key":"order","arguments":{}},
    {"source":"order.exchange","vhost":"/","destination":"order-queue","destination_type":"queue","routing_key":"external","arguments":{}},
    {"source":"products.exchange","vhost":"/","destination":"order.exchange","destination_type":"exchange","routing_key":"external","arguments":{}}
  ]
}

 

With these changes, we ensure that a message published from the product microservice with the "external" routing key will reach our Orders microservice to be processed.

 

 

Finally, all that’s left is to modify the GetOrder service so it reads from the table residing solely in our microservice via IProductNameService:

public interface IGetOrderService
{
    Task<OrderResponse> Execute(Guid orderId, CancellationToken cancellationToken = default(CancellationToken));
}

public class GetOrderService : IGetOrderService
{
    private readonly IOrderRepository _orderRepository;
    private readonly IProductNameService _productNameService;//<- This is the change

    public GetOrderService(IOrderRepository orderRepository, IProductNameService productNameService)
    {
        _orderRepository = orderRepository;
        _productNameService = productNameService;
    }


    public async Task<OrderResponse> Execute(Guid orderId,
        CancellationToken cancellationToken = default(CancellationToken))
    {
        OrderDetails orderDetails = await _orderRepository.GetById(orderId, cancellationToken);
        //on a real scenario this implementation will be much bigger.
        
        //SelectAsync is a custom method, go to the source code to check it out 
        var products = await orderDetails.Products
            .SelectAsync(async p => new ProductQuantityName(p.ProductId, p.Quantity,
                await _productNameService.GetProductName(p.ProductId, cancellationToken)));
        
        return new OrderResponse(orderDetails.Id, orderDetails.Status.ToString(), orderDetails.Delivery,
            orderDetails.PaymentInformation, products.ToList());
    }
}

 

Now, to do a manual test, we need to run at least the Products read API and the Orders API. But for a complete manual test, you should run the following projects:

  • Distribt.Services.Orders
  • Distribt.Services.Consumer
  • Distribt.Services.Products.Api.Read
  • Distribt.Services.Products.Api.Write
  • Distribt.Services.Products.Consumer

 

Here’s what we do: test that everything works correctly by creating a purchase order with a product with ID 1. When doing GetOrder, we get something like this:

get order example 1

Now, if we update that product in the products microservice, eventually (a second or two), the name will be updated in the Orders microservice and if we do another GetOrder we can see the updated name.

 

update name with eventual consistency

 

 

Conclusion

In this post, we’ve discussed what eventual consistency is

How using eventual consistency affects your architecture

How to implement eventual consistency in code

 

This post was translated from Spanish. You can see the original one here.
If there is any problem you can add a comment bellow or contact me in the website's contact form

Uso del bloqueador de anuncios adblock

Hola!

Primero de todo bienvenido a la web de NetMentor donde podrás aprender programación en C# y .NET desde un nivel de principiante hasta más avanzado.


Yo entiendo que utilices un bloqueador de anuncios como AdBlock, Ublock o el propio navegador Brave. Pero te tengo que pedir por favor que desactives el bloqueador para esta web.


Intento personalmente no poner mucha publicidad, la justa para pagar el servidor y por supuesto que no sea intrusiva; Si pese a ello piensas que es intrusiva siempre me puedes escribir por privado o por Twitter a @NetMentorTW.


Si ya lo has desactivado, por favor recarga la página.


Un saludo y muchas gracias por tu colaboración

© copyright 2025 NetMentor | Todos los derechos reservados | RSS Feed

Buy me a coffee Invitame a un café