Uno de los patrones principales en los sistemas distribuidos es el patrón productor consumidor (producer/consumer) el cual se utiliza para la comunicación asíncrona entre microservicios.

 

 

Nota: el post es bastante largo, pero merece la pena si quieres implementar el patrón productor consumidor. Recuerda que todo este código, lo tienes funcionando en GitHub, con la implementación utilizando RabbitMQ, que veremos en el siguiente post.

 

 

1 - Qué es el patrón productor consumidor? 

El patrón productor consumidor (producers/consumers) en un patrón de diseño que designa los procesos para ser productores de recursos, en nuestro caso, mensajes. O consumidores de los mismos.

 

Cuando hablamos de software utilizamos productor consumidor en comunicación asíncrona, y normalmente será a través de un bus o colas para almacenar esos recursos de forma temporal.

comunicación asíncrona

1.1 - Message Bus

Para implementar la lógica productor consumidor normalmente utilizaremos un message bus, también llamado service bus

 

Esto es debido a que podemos tener múltiples consumidores para el mismo bus y ninguna de las aplicaciones es consciente de cuál es la aplicación que produce los mensajes o que otras aplicaciones están escuchando dicho bus. 

service bus

Un punto importante a tener en cuenta es que no hay garantía de que los mensajes sean consumidos en orden, esto quiere decir que si dos mensajes son generados puede ser que tengamos varios consumidores y no los reciban en el mismo orden.

 

Dependiendo de qué software utilizas para el service bus puede darse el caso de que si no hay ningún consumidor el mensaje se pierda (RabbitMQ) mientras que otros softwares almacenan dichos mensajes (Kafka).

Nota: en RabbitMQ puedes almacenar los mensajes si haces el binding a una cola directamente dentro de RabbitMQ.

 

1.2 - Colas de mensajes

La otra opción cuando utilizamos comunicación asíncrona es crear colas de mensajes.

Y pese a seguir la misma idea que el message bus, cuando utilizamos colas vamos a tener un único cliente de las mismas (un consumidor por cola). 

 

Y a diferencia de los message bus, un mensaje nunca se va de la cola (a no ser que tenga tiempo de expiración), por lo que necesitas almacenamiento (disco) para poder utilizarlas.

colas de mensajes

 

 

2 - Abstracción del patrón Productor Consumidor

El primer paso para conseguir tener un código eficiente es abstraer el patrón que vamos a utilizar. No la implementación de la tecnología a utilizar (rabbitMQ / Kafka) que lo haremos más tarde, sino el patrón en sí.

Para ello he creado una pequeña librería dentro de Distribt.Shared.Communication que nos va a permitir implementar más adelante la tecnología de comunicación que deseemos. 

 

2.1 - Mensajes

Lo primero que vamos a ver es qué tipo de mensaje vamos a enviar.

En muchas arquitecturas vamos a diferenciar entre mensajes de integración (IntegrationMessages) y mensajes de dominio (DomainMessages). 

Principalmente lo haremos para una división lógica y para que nuestra aplicación sea más fácil de comprender.

 

Otro motivo es que los mensajes de integración son los que van a ser generados desde tu domino para que otros servicios los consuman. Mientras que los mensajes de dominio son los que tu dominio genera que el propio dominio va a consumir.

 

Una forma de comprender este concepto muy sencilla es con un ejemplo. Cuando separamos las bases de datos de lectura y escritura.

Digamos que tenemos una base de datos de productos, dividida en dos, read y write. Cuando hacemos “Get producto” siempre lo hacemos de read. Mientras que cuando escribimos información lo hacemos en write.

Ello podría mostrar un flujo como el siguiente:

reader writer architecture

En este escenario, generamos un cambio de nombre en un producto, lo cambiamos en la base de datos de escritura. 

De ahí generamos un evento de dominio y lo publicamos al message bus el cual es consumido por el handler que a su vez, actualiza el elemento en la base de datos de lectura y genera el evento de integración “nombre producto cambiado” el cual será consumido por todos aquellos servicios que tengan una dependencia sobre dicho mensaje.

 

2.2 - Mensajes de dominio y Mensajes de integración

Ahora vamos a pasar a explicar los mensajes como tal. Para esta librería contamos tanto con IntegrationMessages cómo DomainMessages, para el ejemplo, ambos son iguales, pero en el futuro si implementamos DDD podríamos añadir propiedades como AggregateId al DomainMessage.

 

Y aquí es donde quería llegar, para los mensajes vamos a utilizar composición sobre herencia (composition over inheritance).

El motivo por el que lo hacemos así es basándome en mi experiencia personal, lo más común es ir a empresas y que utilicen herencia para los mensajes, tanto de dominio como de integración.

 

El problema es que hacer eso es en la gran mayoría de las veces, es una chapuza, ya que el mensaje necesita tener muchos más elementos (propiedades) de los necesarios, elementos que han de ser añadidos y mantenidos por los desarrolladores que hacen los microservicios:

public class DomainMessage 
{
    public string MessageIdentifier { get;set; }
    public string CorrelationId { get; set; }
    public string MessageName { get; set; }
    public DateTime CreatedTimeUtc { get; set; } 
}

public class SubscriptionDomainMessage : DomainMessage
{
    public string Email { get; set; }
}

Debido a toda esta administración, los mensajes acaban con la mitad de propiedades con valores null, o asignadas en cualquier punto de la aplicación, estando erróneas, etc, porque básicamente es una pesadilla, cada vez que cambia la liberia todo el mundo tiene que actualizar y en la práctica, no pasa.

 

Así que por ese motivo nosotros vamos a ir por otra ruta, vamos a permitir que el usuario se centre únicamente en el mensaje que quiere enviar y su contenido. 

en este caso, subscription, de hecho, no necesitamos que se llame subscriptionDomainMessage, simplemente con SubscriptionDto nos srive.

public class SubscriptionDto
{
    public string Email { get; set; }
}

Pero para ello, debemos añadir complejidad a nuestra librería, no mucha la verdad, pero un poco, y a la larga, créeme que merece la pena.

De primeras, vamos a separar lógicamente lo que necesitamos, por ejemplo, sabemos que en el futuro utilizaremos correlationId para la observabilidad y mantener la información sobre cuando el mensaje fue creado también puede ser valioso. esa información la pondremos en una clase llamada Metadata

public record Metadata
{
    public string CorrelationId { get; }
    public DateTime CreatedUtc { get; }

    public Metadata(string correlationId, DateTime createdUtc)
    {
        CorrelationId = correlationId;
        CreatedUtc = createdUtc;
    }
}

 

Aplicamos una lógica similar para “que debería contener el mensaje”, información básica, que no es “metadata” pero están más relacionadas a un mensaje, para ello crearemos una interfaz, llamada IMessage, con esta información.

Además, utilizaremos dicha clase en el futuro, para identificar que un mensaje es de hecho un mensaje de dominio o de integración. 

public interface IMessage
{
    public string MessageIdentifier { get; }
    public string Name { get; }
}

 

Y crearemos DomainMessage e IntegrationMessage basados en el:

public record DomainMessage : IMessage
{
    public string MessageIdentifier { get; }
    public string Name { get; }

    public DomainMessage(string messageIdentifier, string name)
    {
        MessageIdentifier = messageIdentifier;
        Name = name;
    }
}

public record IntegrationMessage : IMessage
{
    public string MessageIdentifier { get; }
    public string Name { get; }
    public IntegrationMessage(string messageIdentifier, string name)
    {
        MessageIdentifier = messageIdentifier;
        Name = name;
    }
}

Recuerda, si quieres poner alguna información adicional, como AggregateId, lo puedes hacer dentro de DomainMessage/IntegrationMessage.

 

Finalmente, únicamente nos queda ponerlo todo bien juntito, y añadir el elemento principal, el mensaje que quiere mandar el usuario. Creamos dichas clases con un tipo genérico , el cual es el tipo que el usuario va a enviar: 

public record IntegrationMessage<T> : IntegrationMessage
{
    public T Content { get; }
    public Metadata Metadata { get; }

    public IntegrationMessage(string messageIdentifier, string name, T content, Metadata metadata)
        : base(messageIdentifier, name)
    {
        Content = content;
        Metadata = metadata;
    }
}

public record DomainMessage<T> : DomainMessage
{
    public T Content { get; }
    public Metadata Metadata { get; }

    public DomainMessage(string messageIdentifier, string name, T content, Metadata metadata)
        : base(messageIdentifier, name)
    {
        Content = content;
        Metadata = metadata;
    }
}  

Con todo esto, el usuario/desarrollador debe ocuparse únicamente de T, mientras que del resto de elementos va a ser la librería la que se encargue. 

Ahora solo nos queda enviarlos y recibirlos.

 

2.3 - Publicar mensajes

La gran mayoría de la lógica de enviar mensajes sucede en la abstracción que creemos para el servicio que utilicemos, ya sea kafka, mosquitto o en nuestro caso RabbitMQ, pese a ello, necesitamos algo en este proyecto. 

 

Crearemos una interfaz la cual será implementada en dicha abstracción, un método para publicar un único mensaje y otro para publicar una lista.

public interface IExternalMessagePublisher<in TMessage>
    where TMessage : IMessage
{
    Task Publish(TMessage message, string? routingKey = null, CancellationToken cancellationToken = default);
    Task PublishMany(IEnumerable<TMessage> messages, string? routingKey = null, CancellationToken cancellationToken = default);
}

 

Como puedes observar estamos enviando TMessage, que en nuestro caso será o bien IntegrationMessage<T> o DomainEvent<T>, asi como RoutingKey que es una propiedad que se asigna en la cabecera del mensaje para identificar tipos de mensajes, etc 

Veremos más sobre RoutingKey en la implementación con RabbitMQ.

Nota: los ejemplos de código están únicamente en los mensajes de integración, pero para publicar mensajes de dominio hacemos lo mismo.

 

Y si esa interfaz es la que utiliza la abstracción, qué es lo que van a utilizar los desarrolladores cuando quieran enviar un mensaje?

 

La siguiente interfaz: 

public interface IIntegrationMessagePublisher
{
    Task Publish(object message, Metadata? metadata = null, string? routingKey = null, CancellationToken cancellationToken = default);
    Task PublishMany(IEnumerable<object> messages, Metadata? metadata = null, string? routingKey = null, CancellationToken cancellationToken = default);
}

En la que tenemos también dos métodos, similar a la interfaz anterior.

 

Ahora queda juntarlo todo, para ello creamos una clase que implementa nuestra interfaz IIntegrationMessagePublisher y ejecuta IExternalMessagePublisher<IntegrationMessage> :

public class DefaultIntegrationMessagePublisher : IIntegrationMessagePublisher
{
    private readonly IExternalMessagePublisher<IntegrationMessage> _externalPublisher;

    public DefaultIntegrationMessagePublisher(IExternalMessagePublisher<IntegrationMessage> externalPublisher)
    {
        _externalPublisher = externalPublisher;
    }

    public Task Publish(object message, Metadata? metadata = null, string? routingKey = null, CancellationToken cancellationToken = default)
    {
        Metadata calculatedMetadata = CalculateMetadata(metadata);
        var integrationMessage = IntegrationMessageMapper.MapToMessage(message, calculatedMetadata);
        return _externalPublisher.Publish(integrationMessage, routingKey, cancellationToken);
    }

    public Task PublishMany(IEnumerable<object> messages, Metadata? metadata = null, string? routingKey = null, CancellationToken cancellationToken = default)
    {
        var integrationMessages =
            messages.Select(a => IntegrationMessageMapper.MapToMessage(a, CalculateMetadata(metadata)));
        return _externalPublisher.PublishMany(integrationMessages, routingKey, cancellationToken);
    }

    private Metadata CalculateMetadata(Metadata? metadata)
    {
        return metadata ?? new Metadata(Guid.NewGuid().ToString(), DateTime.UtcNow);
    }
}

 

Finalmente solo nos queda incluirlo en el contenedor de dependencias.

public static void AddPublisher<TMessage>(this IServiceCollection serviceCollection)
{
    if (typeof(TMessage) == typeof(IntegrationMessage))
    {
        serviceCollection.AddIntegrationBusPublisher();
    }
    else if (typeof(TMessage) == typeof(DomainMessage))
    {
        serviceCollection.AddDomainBusPublisher();
    }
}

private static void AddIntegrationBusPublisher(this IServiceCollection serviceCollection)
{
    serviceCollection.AddTransient<IIntegrationMessagePublisher, DefaultIntegrationMessagePublisher>();
}


private static void AddDomainBusPublisher(this IServiceCollection serviceCollection)
{
    serviceCollection.AddTransient<IDomainMessagePublisher, DefaultDomainMessagePublisher>();
}

 

Una vez incluido en el contenedor de dependencias, podemos inyectar en cualquier servicio y utilzarlo. Por ejemplo si inyectamos IIntegrationMessagePublisher _publishser podemos ejecutar el siguiente código:

await publisher.Publish(subscriptionDto);

Y enviara el mensaje subscriptionDto envuelto en un IntegrationMessage<SubscriptionDto>.

 

2.4 - Consumir mensajes

Consumir mensajes es algo más complejo, ya que requiere no solo de consumirlos, sino además de procesarlos (handle). 

Vamos a empezar igual que antes, con la interfaz que implementará la abstracción del servicio:

public interface IMessageConsumer
{
    Task StartAsync(CancellationToken cancelToken = default);
}

public interface IMessageConsumer<T> : IMessageConsumer
{
}

 

La interfaz IMessageConsumer<T> es la que utilizará RabbitMQ.

 

¿Pero cómo vamos a consumir dichos mensajes? 

Es común ver aplicaciones de consola que únicamente tienen un código del tipo _servicio.consume<T>() o similar. Esto tiene varios problemas, el primero y obvio, que solo consume un tipo de mensaje, y el segundo es que no tiene interacción con el usuario, en caso de que quieras parar el consumer, o reiniciarlo, empezar desde una fecha en concreto, etc.

 

En mi caso, voy a proponer una solución alternativa, utilizar una WebAPI con un IHostedService

Recuerda que la interfaz IHostedService es una interfaz que nos provee microsoft para correr tareas en segundo plano (background tasks).

 

Para ello, necesitamos, obviamente un hostedService, el cual se va a encargar de iniciar, parar e invocar IMessageConsumer

public class ConsumerHostedService<TMessage> : IHostedService
{
    private readonly IConsumerManager<TMessage> _consumerManager;
    private readonly IMessageConsumer<TMessage> _messageConsumer;
    private readonly CancellationTokenSource _stoppingCancellationTokenSource =
        new CancellationTokenSource();
    private Task? _executingTask;

    public ConsumerHostedService(IConsumerManager<TMessage> consumerManager, IMessageConsumer<TMessage> messageConsumer)
    {
        _consumerManager = consumerManager;
        _messageConsumer = messageConsumer;
    }

    public Task StartAsync(CancellationToken cancellationToken)
    {
        _executingTask = ConsumeMessages(_stoppingCancellationTokenSource.Token);

        return _executingTask.IsCompleted ? _executingTask : Task.CompletedTask;    
    }

    public Task StopAsync(CancellationToken cancellationToken)
    {
        _stoppingCancellationTokenSource.Cancel();
        _consumerManager.StopExecution();
        return  Task.CompletedTask;
    }

    private async Task ConsumeMessages(CancellationToken cancellationToken)
    {
        while (!cancellationToken.IsCancellationRequested)
        {
            var ct = _consumerManager.GetCancellationToken();
            if (ct.IsCancellationRequested) break;
            try
            {
                await _messageConsumer.StartAsync(cancellationToken);
            }catch (OperationCanceledException)
            {
                //the operation is getting cancelled, ignore.
            }
        }
    }
}

 

Para controlar cuándo empezar, cuando parar, etc., utilizamos la interfaz IConsumerManager. Pero en este punto no podemos hacer nada, por ello, debemos crear un controlador, que actuará como “base” para dicha administración:

public class ConsumerController<TMessage> : Controller
{
    private readonly IConsumerManager<TMessage> _consumerManager;

    public ConsumerController(IConsumerManager<TMessage> consumerManager)
    {
        _consumerManager = consumerManager;
    }
    
    [HttpPut]
    [ProducesResponseType(StatusCodes.Status200OK)]
    [Route("start")]
    public virtual IActionResult Start()
    {
        _consumerManager.RestartExecution();
        return Ok();
    }
} 

 

Lo añadimos al contenedor de dependencias:

public static void AddConsumer<TMessage>(this IServiceCollection serviceCollection)
{
    serviceCollection.AddSingleton<IConsumerManager<TMessage>, ConsumerManager<TMessage>>();
    serviceCollection.AddSingleton<IHostedService, ConsumerHostedService<TMessage>>();
}

 

y voila!, solo tenemos que heredar en un controller de nuestro ConsumerController para que funcione.

[ApiController]
[Route("[controller]")]
public class IntegrationConsumerController : ConsumerController<IntegrationMessage>
{
    public IntegrationConsumerController(IConsumerManager<IntegrationMessage> consumerManager) : base(consumerManager)
    {
    }
}

Nota: necesitaremos un controller para los mensajes de integración y otro para los mensajes de dominio.

 

2.5 - Procesar los mensajes

Hemos visto cómo publicar, como consumir mensajes y ahora debemos ver cómo procesarlos.

Una solución rápida (y mala) seria poner una lista interminable de ifs, en algún lugar de nuestro código, pero claro, si hiciéramos esto, no podríamos construir una librería ya que dichoS ifs estarían ubicados en el core de la misma (técnicamente se puede pasar con un delegado, pero hacer eso es un cagarro, igual que los ifs).

 

Entonces, ¿cómo lo hacemos? 

Sencillo, vamos a crear lo que se denominan Handlers, y utilizando reflection vamos a identificar, basándonos en el mensaje recibido cual es el handler a utilizar y ejecutaremos el mensaje contra dicho handler. 

public class HandleMessage  : IHandleMessage
{
    private readonly IMessageHandlerRegistry _messageHandlerRegistry;

    public HandleMessage(IMessageHandlerRegistry messageHandlerRegistry)
    {
        _messageHandlerRegistry = messageHandlerRegistry;
    }

    public Task Handle(IMessage message, CancellationToken cancellationToken = default)
    {
        if (message == null) throw new ArgumentNullException(nameof(message));

        Type messageType = message.GetType();
        var handlerType = typeof(IMessageHandler<>).MakeGenericType(messageType);
        List<IMessageHandler> handlers = _messageHandlerRegistry.GetMessageHandlerForType(handlerType, messageType).ToList();

        foreach (IMessageHandler handler in handlers)
        {
            Type messageHandlerType = handler.GetType();
            
            MethodInfo? handle = messageHandlerType.GetMethods()
                .Where(methodInfo => methodInfo.Name == nameof(IMessageHandler<object>.Handle))
                .FirstOrDefault(info => info.GetParameters()
                    .Select(parameter => parameter.ParameterType)
                    .Contains(message.GetType()));
            
            if (handle != null) 
                return  (Task) handle.Invoke(handler, new object[] {message, cancellationToken})!;
        }
        return  Task.CompletedTask;
    }
}

La interfaz IMessageHandlerRegistry contiene una lista de los handlers disponibles en el sistema.

 

Ahora debemos definir las interfaces de ambos handlers, de los mensajes de integración y los mensajes de dominio. 

public interface IMessageHandler
{
}

public interface IMessageHandler<in TMessage> : IMessageHandler
{
    Task Handle(TMessage message, CancellationToken cancelToken = default(CancellationToken));
}

public interface IIntegrationMessageHandler : IMessageHandler
{
}

public interface IIntegrationMessageHandler<TMessage> 
    : IMessageHandler<IntegrationMessage<TMessage>>, IIntegrationMessageHandler
{
}

public interface IDomainMessageHandler : IMessageHandler
{
}

public interface IDomainMessageHandler<TMessage> 
    : IMessageHandler<IntegrationMessage<TMessage>>, IDomainMessageHandler
{
}

 

¿Qué nos permite este código? 

Muy sencillo, en nuestra aplicación podemos crear handlers de manera individual, como el siguiente: 

public class SubscriptionHandler : IIntegrationMessageHandler<SubscriptionDto>
{
    public Task Handle(IntegrationMessage<SubscriptionDto> message, CancellationToken cancelToken = default(CancellationToken))
    {
       Console.WriteLine($"Email {message.Content.Email} successfully subscribed.");
       return Task.CompletedTask;
    }
}

 

El cual se ejecutarán únicamente cuando consumamos un mensaje del tipo especificado

 

Conclusión

  • En este post hemos visto que es el patrón productor consumidor y cuáles son las opciones que tenemos a la hora de crear comunicación asíncrona.
  • Además, hemos visto una abstracción de este patrón escrita en C# que nos ayudará en el futuro.

En el siguiente post veremos RabbitMQ y su implementación.