Consistencia eventual en microservicios

En este post vamos a ver uno de los puntos donde la gran mayoría de desarrolladores “vieja escuela” suele caer o suele estar en contra, debido a que este  tipo de desarrolladores tiene una mentalidad de como estrucutrar las aplicaciones muy diferente.

 

 

Este post esta dentro del curso de Distribt, pero a su vez podríamos incluirlo en un subgrupo junto al patrón CQRS y event sourcing, ya que normalmente se ven todos juntos sin pararse a pensar donde entra cada patrón.

Para seguir la parte de la implementación, recomiendo haber seguido el curso, así tendrás el contexto necesario (se puede seguir sin el contexto, pero ayuda).

 

 

1 - Que es la consistencia eventual?

Como su nombre indica, la consistencia eventual es la forma en la que denominamos a que la información va a ser consistente en todo el sistema eventualmente.

 

1.1 - Pero, qué quiere decir eventualmente? 

Cuando trabajamos con microservicios, o con aplicaciones distribuidas los creamos con la idea de que solo el código del dominio correspondiente va a modificar o acceder a los datos de su propio sistema. 

 

Por ejemplo, si tenemos una tabla con productos, únicamente el servicio de productos puede acceder a ellos, está totalmente prohibido que servicios externos como pueden ser las órdenes de compra accedan a la información de la base de datos directamente. 

microservicios connection between services and dbs

Y aquí es donde viene toda la incertidumbre de los desarrolladores “vieja escuela” al no poder acceder a la base de datos directamente como se suele hacer en los sistemas monolíticos debemos proveer diferentes alternativas, lo que normalmente requiere más código. 

 

A - Llamadas API 

La primera de las alternativas es hacer una llamada API cada vez que necesitemos dicha información, por ej, tu te guardas en los pedidos el Id del producto, y si quieres devolver en la api dicho nombre, lo que tienes que hacer es una llamada a la api de productos cada vez que haya un producto en un pedido.

Llamada api entre servicios

Como puedes imaginar, para este caso en concreto, esta solución no nos sirve, ya que realizaremos miles de llamadas al microservicio de productos cuando no es totalmente necesario.

 

Utilizaremos este escenario cuando tenemos que tener información siempre actualizada.

 

B - Comunicación asíncrona para la consistencia eventual

La segunda de las opciones es utilizar comunicación asíncrona a través del patrón productor consumidor

 

Básicamente, una vez tenemos nuestro registro en la base de datos correcta, creamos un evento de integración el cual va a ser escuchado por todos los sistemas que utilizan dicha información.

 

Es la mimsma lógica que vimos en el episodio sobre el patrón CRQS, la diferencia es que ahora trabajamos con los eventos que van fuera de nuestro dominio, los cuales llamamos, eventos de integración.

 

Por ejemplo, cuando actualizamos el nombre de un producto, lo hacemos en el servicio producto, y de ahí generamos un evento de integración que publicamos en el service bus (En nuestro caso RabbitMQ), el cual será escuchado por otros servicios. En nuestro caso el servicio de pedidos, el cual actualizará el registro en la base de datos interna de su dominio.

  • NOTA: No te preocupes por duplicar la información es el precio a pagar, está totalmente aceptado (en verdad no hay otra forma de hacerlo lo más efectivo posible) 

Eventual consistency con comunicación asíncrona

Ten en cuenta que utilizar comunicación asíncrona suele añadir una demora antes de que todos los sistemas de la arquitectura tengan la información actualizada, de ahí el nombre de “consistencia eventual”. 

 

En circunstancias normales esta demora es de unos segundos como máximo, así que para el 99.9% de casos es preferible este método utilizar llamadas API, por lo menos en mi experiencia personal.

 

 

2 - Implementación de consistencia Eventual en microservicios

Como he dicho antes, cuando vimos el post sobre CQRS vimos consistencia eventual, entre la base de datos de escritura y la de lectura.

 

Podemos aplicar exactamente la misma lógica aquí, pero para crear eventos de integración. Lo que quiere decir, eventos que comunican microservicios con otros microservicios. 

 

Para ello una vez que hemos actualizado la base de datos de lectura, debemos estar seguros de que generamos nuestro evento de integración.

public class ProductUpdatedHandler : IDomainMessageHandler<ProductUpdated>
{
    private readonly IIntegrationMessagePublisher _integrationMessagePublisher;

    public async Task Handle(DomainMessage<ProductUpdated> message, CancellationToken cancelToken = default(CancellationToken))
    {

     // otro código

	   
        await _integrationMessagePublisher.Publish(
            new ProductUpdated(message.Content.ProductId, message.Content.Details), routingKey:"external", cancellationToken: cancelToken);
    }
}

El cual será escuchado por un IntegrationConsumer dentro del servicio de Orders el cual actualizará la tabla. Por cierto, la información que mantenemos en el microservico de orders es la actual, la última, a no ser que estemos aplicando bi-temporal data modeling, así que lo que hacemos es actualizar los datos.

 

2.1 - Carga inicial de datos de las dependencias

En algún punto del ciclo de vida de nuestra aplicación vamos a tener que hacer una carga de datos desde el otro microservicio. 

 

Imagina el siguiente escenario, llevas incluidos varios cientos de elementos porque el servicio de Productos ya está en producción y un par de semanas más tarde creas el de Orders, por lo tanto, todos esos productos vas a tener que “cargarlos inicialmente” para así poder mostrar el nombre.

 

Para ello tenemos múltiples opciones:

 

A - Carga inicial

Cuando ponemos nuestro servicio en producción lo que hacemos es una carga inicial de datos y para ello hacemos una llamada, o varias, al microservicio de productos para leer todos los productos y almacenar la información en nuestra base de datos.

 

Esta carga, únicamente sucede una vez ya que la base de datos no va a ser borrada, en principio, claro. 

Y una vez están cargados, ya utilizamos los eventos de integración nuevos. 

 

B - Usar los eventos de integración ya generados

Podemos configurar nuestro service bus para que cuando un nuevo consumidor se una se propaguen todos los eventos previamente generados. Esta opción suele llevar algo de configuración adicional en las colas y en el caso de RabbitMQ en los exchanges

 

C - Cargar datos al vuelo

Inicialmente no hacemos una carga de datos, y escuchamos los eventos de integración con normalidad, pero, si queremos leer un elemento que no existe, lo que hacemos es llamar a la API de productos para leer dicha información.

 

Qué opción elegir?

Cada opción tiene pros y contras, es función de encontrar la que más se adapte a tu sistema o tus gustos.

A mí personalmente me gusta la opción C y además combinarla junto a una caché ya bien sea en memoria o en redis. 

 

 

2.2 - Implementación de consistencia eventual en C#

Como he dejado caer antes vamos a hacer una carga “pasiva”, osea vamos a cargar los datos únicamente cuando los necesitemos una primera vez y los almacenaremos en la caché, por supuesto el consumidor va a estar leyendo los eventos continuamente.

 

Lo primero que tenemos que hacer es crear el consumidor, el cual consume el tipo ProductUpdated

public class ProductModifierHandler : IIntegrationMessageHandler<ProductUpdated>
{
    private readonly IProductNameService _productNameService;

    public ProductModifierHandler(IProductNameService productNameService)
    {
        _productNameService = productNameService;
    }

    public async Task Handle(IntegrationMessage<ProductUpdated> message, CancellationToken cancelToken = default(CancellationToken))
    {
        await _productNameService.SetProductName(message.Content.ProductId, message.Content.Details.Name, cancelToken);
    }
}

Y en el servicio es donde hacemos la mayoría de la lógica, donde lo insertamos en la caché y en la base de datos: 

public interface IProductNameService
{
    Task<string> GetProductName(int id, CancellationToken cancellationToken = default(CancellationToken));
    Task SetProductName(int id, string name, CancellationToken cancellationToken = default(CancellationToken));
}

public class ProductNameService : IProductNameService
{
    private readonly IProductRepository _productRepository;
    private readonly IDistributedCache _cache;
    private readonly IHttpClientFactory _httpClientFactory;
    private readonly IServiceDiscovery _discovery;

    public ProductNameService(IProductRepository productRepository, IDistributedCache cache,
        IHttpClientFactory httpClientFactory, IServiceDiscovery discovery)
    {
        _productRepository = productRepository;
        _cache = cache;
        _httpClientFactory = httpClientFactory;
        _discovery = discovery;
    }


    public async Task<string> GetProductName(int id, CancellationToken cancellationToken = default(CancellationToken))
    {
        string value = await _cache.GetStringAsync($"ORDERS-PRODUCT::{id}", cancellationToken);
        if (value!=null)
        {
            return value;
        }
        string productName = await RetrieveProductName(id, cancellationToken);

        return productName;
    }

    public async Task SetProductName(int id, string name,
        CancellationToken cancellationToken = default(CancellationToken))
    {
        await _productRepository.UpsertProductName(id, name, cancellationToken);
        await _cache.SetStringAsync($"ORDERS-PRODUCT::{id}", name, cancellationToken);
    }
   
    
    private async Task<string> RetrieveProductName(int id, CancellationToken cancellationToken)
    {
        string? productName = await _productRepository.GetProductName(id, cancellationToken);

        if (productName == null)
        {
            FullProductResponse product = await GetProduct(id, cancellationToken);
            await SetProductName(id, product.Details.Name, cancellationToken);
            productName = product.Details.Name;
        }

        return productName;
    }
    
    private async Task<FullProductResponse> GetProduct(int productId, CancellationToken cancellationToken = default(CancellationToken))
    {
        HttpClient client = _httpClientFactory.CreateClient();
        string productsReadApi =
            await _discovery.GetFullAddress(DiscoveryServices.Microservices.ProductsApi.ApiRead, cancellationToken);
        client.BaseAddress = new Uri(productsReadApi);

        //TODO: replace exception
        return await client.GetFromJsonAsync<FullProductResponse>($"product/{productId}", cancellationToken) ?? 
               throw  new ArgumentException("Product does not exist");
    }
}

Como ves estamos utilizando la interfaz IServiceDiscovery la cual registra los diferentes servicios de nuestra aplicación de forma automática.

 

 

Anteriormente he indicado que copiar datos está bien, que no pasa nada, pero claro tampoco tenemos que copiar más de los que necesitamos, en este caso vamos a necesitar únicamente el nombre, así que la tabla va a ser id-nombre, ningún dato más. 

 

Podemos utilizar cualquier base de datos, ya bien sea NoSQL o SQL, en este caso vamos a utilizar NoSQL ya que el resto del microservicio utiliza NoSQL. También recuerda modificar la caché si es necesario. 

 

No te olvides de escuchar la cola correspondiente en el consumer y enlazarlas en la configuración de RabbitMQ:

{
  "queues": [
    {"name":"order-domain-queue","vhost":"/","durable":true,"auto_delete":false,"arguments":{}},
    {"name":"order-domain-queue.dead-letter","vhost":"/","durable":true,"auto_delete":false,"arguments":{}},
    {"name":"order-queue","vhost":"/","durable":true,"auto_delete":false,"arguments":{}},
    {"name":"order-queue.dead-letter","vhost":"/","durable":true,"auto_delete":false,"arguments":{}},
  ],
  "bindings": [
    {"source":"order.exchange","vhost":"/","destination":"order-domain-queue","destination_type":"queue","routing_key":"order","arguments":{}},
    {"source":"order.exchange","vhost":"/","destination":"order-queue","destination_type":"queue","routing_key":"external","arguments":{}},
    {"source":"products.exchange","vhost":"/","destination":"order.exchange","destination_type":"exchange","routing_key":"external","arguments":{}}
  ]
}

 

Con estos cambios conseguiremos que un mensaje publicado desde el microservicio de productos con el routing key de “external” llegue a nuestro microservicio de Orders para ser procesado.

 

 

Finalmente únicamente nos queda modificar el servicio de GetOrder para leer de nuestra tabla que esta únicamente en nuestro microservicio a través de IProductNameService:

public interface IGetOrderService
{
    Task<OrderResponse> Execute(Guid orderId, CancellationToken cancellationToken = default(CancellationToken));
}

public class GetOrderService : IGetOrderService
{
    private readonly IOrderRepository _orderRepository;
    private readonly IProductNameService _productNameService;//<- Este es el cambio

    public GetOrderService(IOrderRepository orderRepository, IProductNameService productNameService)
    {
        _orderRepository = orderRepository;
        _productNameService = productNameService;
    }


    public async Task<OrderResponse> Execute(Guid orderId,
        CancellationToken cancellationToken = default(CancellationToken))
    {
        OrderDetails orderDetails = await _orderRepository.GetById(orderId, cancellationToken);
        //on a real scenario this implementation will be much bigger.
        
        //SelectAsync is a custom method, go to the source code to check it out 
        var products = await orderDetails.Products
            .SelectAsync(async p => new ProductQuantityName(p.ProductId, p.Quantity,
                await _productNameService.GetProductName(p.ProductId, cancellationToken)));
        
        return new OrderResponse(orderDetails.Id, orderDetails.Status.ToString(), orderDetails.Delivery,
            orderDetails.PaymentInformation, products.ToList());
    }
}

 

Ahora para hacer un test manual, debemos ejecutar como mínimo la api de lectura de Products y la api de Orders, Pero para hacer un test manual completo, debemos ejecutar los siguientes proyectos:

  • Distribt.Services.Orders
  • Distribt.Services.Consumer
  • Distribt.Services.Products.Api.Read
  • Distribt.Services.Products.Api.Write
  • Distribt.Services.Products.Consumer

 

Así lo que hacemos es lo siguiente, probamos que todo funciona correctamente creando una orden de compra con un producto con el ID 1; El cual al hacer GetOrder nos devolverá algo como lo siguiente: 

get order ejemplo 1

Y ahora actualizamos dicho producto en el microservicio de productos y eventualmente (un segundo o dos) el nombre será actualizado en el microservicio de Orders y si hacemos otro GetOrder podremos ver el nombre actualizado.

 

actualizar nombre con consistencia eventual

 

 

Conclusión

En este post hemos visto qué es la consistencia eventual 

Cómo afecta a nuestra arquitectura el uso de consistencia eventual

Cómo implementar consistencia eventual en código

 


Uso del bloqueador de anuncios adblock

Hola!

Primero de todo bienvenido a la web de NetMentor donde podrás aprender programación en C# y .NET desde un nivel de principiante hasta más avanzado.


Yo entiendo que utilices un bloqueador de anuncios como AdBlock, Ublock o el propio navegador Brave. Pero te tengo que pedir por favor que desactives el bloqueador para esta web.


Intento personalmente no poner mucha publicidad, la justa para pagar el servidor y por supuesto que no sea intrusiva; Si pese a ello piensas que es intrusiva siempre me puedes escribir por privado o por Twitter a @NetMentorTW.


Si ya lo has desactivado, por favor recarga la página.


Un saludo y muchas gracias por tu colaboración

© copyright 2024 NetMentor | Todos los derechos reservados | RSS Feed

Buy me a coffee Invitame a un café