Producer Consumer Pattern

One of the main patterns in distributed systems is the producer consumer pattern (producer/consumer), which is used for asynchronous communication between microservices.

 

 

Note: this post is quite long, but it is worth it if you want to implement the producer consumer pattern. Remember that all this code is running on GitHub, implemented using RabbitMQ, which we will cover in the next post.

 

 

1 - What is the producer consumer pattern?

The producer consumer pattern (producers/consumers) is a design pattern that designates processes as producers of resources, in our case, messages, or as consumers of those resources.

 

When we talk about software, we use the producer consumer pattern in asynchronous communication, and usually it will be through a bus or queues to store those resources temporarily.

asynchronous communication

We use the producer consumer pattern, for example, when we subscribe to a website, newsletter, etc., where that service sends us an email.

 

For example, you enter your email and as soon as you click “subscribe”, the server responds “Thank you for subscribing”, but two minutes later you receive an email confirming that you are subscribed.

This is because the producer consumer pattern is being used.

From the user's perspective, clicking means your email is registered and you received a message in your inbox. But that's not exactly what happens.

After clicking, the software creates a message with your email and publishes it to our message broker, quickly responding to the user that everything is fine.

 

But the message still needs to be processed. For this, another system will consume the message and send the confirmation email.

producers consumers use case

This is just one of many examples you can find on the web. When you buy products on many web pages (for example, Amazon), it works the same way. You are given an order ID and later you get a confirmation that everything is fine, but if something fails, you are also notified.

 

1.1 - Message Bus

To implement the producer consumer logic, we will usually use a message bus, also called a service bus.

 

This is because we can have multiple consumers for the same bus, and none of the applications are aware of which application produces messages or what other applications are listening to that bus.

service bus

An important point to keep in mind is that there is no guarantee that messages will be consumed in order. This means that if two messages are generated, there may be several consumers and they will not receive them in the same order.

 

Depending on the software you use for the service bus, if there is no consumer, the message may be lost (RabbitMQ), while other software stores those messages (Kafka).

Note: in RabbitMQ you can store messages if you bind them to a queue directly inside RabbitMQ.

 

1.2 - Message Queues

The other option when using asynchronous communication is to create message queues.

While this follows the same idea as the message bus, when we use queues we will have a single client (one consumer per queue).

 

And unlike message buses, a message never leaves the queue (unless it has an expiration time), so you need storage (disk) to use them.

message queues

 

1.2.1 - Dead letter Queue

When processing messages, we expect everything to go well and smoothly, but what happens when something goes wrong? Whether it’s a bug or a part of the system is down.

 

Well, that's where dead letter queues come in. In our use case, we consider a message completed only if it is successfully sent. If it's not, we will have an error code explaining why. But what happens to the message?

 

The most common thing is for the message to return to the queue and be executed in an infinite loop. As you can imagine, infinite loops are not a good idea. For this reason, after X number of attempts, we should move the message to another queue, where it will be stored until another process (or manually) checks why the message failed, fixes the bug if necessary, and reinserts the message into the queue it should have originally gone to.

dead letter queue explanation

 

2 - Abstraction of the Producer Consumer pattern

The first step to having efficient code is to abstract the pattern we are going to use. Not the implementation of the technology to be used (RabbitMQ / Kafka), which we will do later, but the pattern itself.

To this end, I have created a small library inside Distribt.Shared.Communication that will allow us to later implement the communication technology we want.

 

2.1 - Messages

The first thing we need to see is what type of messages we are going to send.

In many architectures, we distinguish between integration messages (IntegrationMessages) and domain messages (DomainMessages).

We do this mainly for a logical separation and to make our application easier to understand.

 

Another reason is that integration messages are generated from your domain for other services to consume, while domain messages are generated by your own domain for the domain itself to consume.

 

A simple way to understand this concept is with an example: separating read and write databases.

Let's say we have a product database, split into two: read and write. When we "Get product", we always do it from the read database, while when we write information, we do it in write.

This could illustrate a flow like this:

reader writer architecture

In this scenario, we trigger a name change in a product in the write database.

From there, we generate a domain event and publish it to the message bus, which is consumed by the handler, who updates the element in the read database and generates the integration event “product name changed”, which will be consumed by any service that has a dependency on that message.

 

2.2 - Domain and Integration Messages

Now let's explain the messages themselves. For this library, we have both IntegrationMessages and DomainMessages; in this example, both are the same, but in the future, if we implement DDD, we could add properties like AggregateId to the DomainMessage.

 

And here's where I wanted to get to: for messages, we are going to use composition over inheritance.

The reason for this, based on my personal experience, is that it's very common in companies to use inheritance for both domain and integration messages.

 

The problem is that most of the time, it ends up being a mess, since the message ends up having way more properties than necessary, elements that developers of the microservices must add and maintain:

public class DomainMessage 
{
    public string MessageIdentifier { get;set; }
    public string CorrelationId { get; set; }
    public string MessageName { get; set; }
    public DateTime CreatedTimeUtc { get; set; } 
}

public class SubscriptionDomainMessage : DomainMessage
{
    public string Email { get; set; }
}

Because of all this management, messages end up with half the properties as null values or being assigned anywhere in the application, possibly with mistakes, etc., because basically, it's a nightmare. Every time the library changes, everyone needs to update, but in practice, it doesn’t happen.

 

So, for that reason, we'll take a different path: we'll let the user focus only on the message they want to send and its content.

In this case, subscription. In fact, we don't need to call it subscriptionDomainMessage; just SubscriptionDto works.

public class SubscriptionDto
{
    public string Email { get; set; }
}

For this, however, we must add some complexity to our library, not much, but a little, and in the long run, it's worth it.

First, let's logically separate what we need. For example, in the future we'll use correlationId for observability, and keeping track of when the message was created may also be valuable. We'll put that info in a class called Metadata:

public record Metadata
{
    public string CorrelationId { get; }
    public DateTime CreatedUtc { get; }

    public Metadata(string correlationId, DateTime createdUtc)
    {
        CorrelationId = correlationId;
        CreatedUtc = createdUtc;
    }
}

 

We apply a similar logic for “what should a message contain?”, basic info that isn’t metadata, but more closely related to the message itself. For that, we’ll create an interface called IMessage with this information.

We'll also use this class in the future to identify whether a message is a domain or integration message.

public interface IMessage
{
    public string MessageIdentifier { get; }
    public string Name { get; }
}

 

And we'll create DomainMessage and IntegrationMessage based on this:

public record DomainMessage : IMessage
{
    public string MessageIdentifier { get; }
    public string Name { get; }

    public DomainMessage(string messageIdentifier, string name)
    {
        MessageIdentifier = messageIdentifier;
        Name = name;
    }
}

public record IntegrationMessage : IMessage
{
    public string MessageIdentifier { get; }
    public string Name { get; }
    public IntegrationMessage(string messageIdentifier, string name)
    {
        MessageIdentifier = messageIdentifier;
        Name = name;
    }
}

Remember, if you want to add additional information, like AggregateId, you can do it inside DomainMessage/IntegrationMessage.

 

Finally, all we need is to put all this together and add the main element: the message the user wants to send. We create these classes with a generic type, which is the type the user is going to send:

public record IntegrationMessage<T> : IntegrationMessage
{
    public T Content { get; }
    public Metadata Metadata { get; }

    public IntegrationMessage(string messageIdentifier, string name, T content, Metadata metadata)
        : base(messageIdentifier, name)
    {
        Content = content;
        Metadata = metadata;
    }
}

public record DomainMessage<T> : DomainMessage
{
    public T Content { get; }
    public Metadata Metadata { get; }

    public DomainMessage(string messageIdentifier, string name, T content, Metadata metadata)
        : base(messageIdentifier, name)
    {
        Content = content;
        Metadata = metadata;
    }
}  

With all this, the user/developer only needs to worry about T, while the library takes care of the rest.

Now all that's left is to send and receive them.

 

2.3 - Publishing messages

Most of the logic for sending messages happens in the abstraction we create for the service we use, whether it's Kafka, Mosquitto, or in our case RabbitMQ. Still, we need something in this project.

 

We will create an interface to be implemented in that abstraction, one method to publish a single message, and another to publish a list.

public interface IExternalMessagePublisher<in TMessage>
    where TMessage : IMessage
{
    Task Publish(TMessage message, string? routingKey = null, CancellationToken cancellationToken = default);
    Task PublishMany(IEnumerable<TMessage> messages, string? routingKey = null, CancellationToken cancellationToken = default);
}

 

As you can see, we are sending TMessage, which in our case will be IntegrationMessage<T> or DomainMessage<T> as well as RoutingKey, which is a property assigned to the message header to identify message types, etc.

We'll learn more about RoutingKey in the RabbitMQ implementation.

Note: The code examples use integration messages only, but to publish domain messages we use the same approach.

 

And if that interface is what abstraction uses, what do developers use when they want to send a message?

 

The following interface:

public interface IIntegrationMessagePublisher
{
    Task Publish(object message, Metadata? metadata = null, string? routingKey = null, CancellationToken cancellationToken = default);
    Task PublishMany(IEnumerable<object> messages, Metadata? metadata = null, string? routingKey = null, CancellationToken cancellationToken = default);
}

We also have two methods here, similar to the previous interface.

 

Now let's bring it all together. For this, we create a class that implements our IIntegrationMessagePublisher interface and executes IExternalMessagePublisher<IntegrationMessage> :

public class DefaultIntegrationMessagePublisher : IIntegrationMessagePublisher
{
    private readonly IExternalMessagePublisher<IntegrationMessage> _externalPublisher;

    public DefaultIntegrationMessagePublisher(IExternalMessagePublisher<IntegrationMessage> externalPublisher)
    {
        _externalPublisher = externalPublisher;
    }

    public Task Publish(object message, Metadata? metadata = null, string? routingKey = null, CancellationToken cancellationToken = default)
    {
        Metadata calculatedMetadata = CalculateMetadata(metadata);
        var integrationMessage = IntegrationMessageMapper.MapToMessage(message, calculatedMetadata);
        return _externalPublisher.Publish(integrationMessage, routingKey, cancellationToken);
    }

    public Task PublishMany(IEnumerable<object> messages, Metadata? metadata = null, string? routingKey = null, CancellationToken cancellationToken = default)
    {
        var integrationMessages =
            messages.Select(a => IntegrationMessageMapper.MapToMessage(a, CalculateMetadata(metadata)));
        return _externalPublisher.PublishMany(integrationMessages, routingKey, cancellationToken);
    }

    private Metadata CalculateMetadata(Metadata? metadata)
    {
        return metadata ?? new Metadata(Guid.NewGuid().ToString(), DateTime.UtcNow);
    }
}

 

Finally, we just need to include it in the dependency container.

public static void AddPublisher<TMessage>(this IServiceCollection serviceCollection)
{
    if (typeof(TMessage) == typeof(IntegrationMessage))
    {
        serviceCollection.AddIntegrationBusPublisher();
    }
    else if (typeof(TMessage) == typeof(DomainMessage))
    {
        serviceCollection.AddDomainBusPublisher();
    }
}

private static void AddIntegrationBusPublisher(this IServiceCollection serviceCollection)
{
    serviceCollection.AddTransient<IIntegrationMessagePublisher, DefaultIntegrationMessagePublisher>();
}


private static void AddDomainBusPublisher(this IServiceCollection serviceCollection)
{
    serviceCollection.AddTransient<IDomainMessagePublisher, DefaultDomainMessagePublisher>();
}

 

Once it's included in the dependency container, we can inject it into any service and use it. For example, if we inject IIntegrationMessagePublisher _publisher, we can execute the following code:

await publisher.Publish(subscriptionDto);

And it will send the subscriptionDto message wrapped in an IntegrationMessage<SubscriptionDto>.

 

2.4 - Consuming messages

Consuming messages is a bit more complex, since it requires not just consuming, but also processing them (handling).

Let's start as before, with the interface to implement from the service abstraction:

public interface IMessageConsumer
{
    Task StartAsync(CancellationToken cancelToken = default);
}

public interface IMessageConsumer<T> : IMessageConsumer
{
}

 

The IMessageConsumer<T> interface is what RabbitMQ will use.

 

But how will we actually consume these messages?

It's common to see console apps that just have code like _service.consume<T>() or similar. This causes several issues, the most obvious being it only consumes one type of message. Second, it has no interaction with the user, in case you want to stop the consumer, restart it, start from a certain date, etc.

 

In my case, I propose an alternative solution: use a WebAPI with an IHostedService.

Remember that the IHostedService interface is provided by Microsoft to run background tasks.

 

For this, we obviously need a hostedService that will be in charge of starting, stopping, and invoking IMessageConsumer:

public class ConsumerHostedService<TMessage> : IHostedService
{
    private readonly IConsumerManager<TMessage> _consumerManager;
    private readonly IMessageConsumer<TMessage> _messageConsumer;
    private readonly CancellationTokenSource _stoppingCancellationTokenSource =
        new CancellationTokenSource();
    private Task? _executingTask;

    public ConsumerHostedService(IConsumerManager<TMessage> consumerManager, IMessageConsumer<TMessage> messageConsumer)
    {
        _consumerManager = consumerManager;
        _messageConsumer = messageConsumer;
    }

    public Task StartAsync(CancellationToken cancellationToken)
    {
        _executingTask = ConsumeMessages(_stoppingCancellationTokenSource.Token);

        return _executingTask.IsCompleted ? _executingTask : Task.CompletedTask;    
    }

    public Task StopAsync(CancellationToken cancellationToken)
    {
        _stoppingCancellationTokenSource.Cancel();
        _consumerManager.StopExecution();
        return  Task.CompletedTask;
    }

    private async Task ConsumeMessages(CancellationToken cancellationToken)
    {
        while (!cancellationToken.IsCancellationRequested)
        {
            var ct = _consumerManager.GetCancellationToken();
            if (ct.IsCancellationRequested) break;
            try
            {
                await _messageConsumer.StartAsync(cancellationToken);
            }catch (OperationCanceledException)
            {
                //the operation is getting cancelled, ignore.
            }
        }
    }
}

 

To control when to start, when to stop, etc., we use the IConsumerManager interface. But at this point, we can't do anything, so we need to create a controller, which will act as a “base” for this management:

public class ConsumerController<TMessage> : Controller
{
    private readonly IConsumerManager<TMessage> _consumerManager;

    public ConsumerController(IConsumerManager<TMessage> consumerManager)
    {
        _consumerManager = consumerManager;
    }
    
    [HttpPut]
    [ProducesResponseType(StatusCodes.Status200OK)]
    [Route("start")]
    public virtual IActionResult Start()
    {
        _consumerManager.RestartExecution();
        return Ok();
    }
} 

 

We add this to the dependency container:

public static void AddConsumer<TMessage>(this IServiceCollection serviceCollection)
{
    serviceCollection.AddSingleton<IConsumerManager<TMessage>, ConsumerManager<TMessage>>();
    serviceCollection.AddSingleton<IHostedService, ConsumerHostedService<TMessage>>();
}

 

And voila!, we just need to inherit from ConsumerController in a controller for it to work.

[ApiController]
[Route("[controller]")]
public class IntegrationConsumerController : ConsumerController<IntegrationMessage>
{
    public IntegrationConsumerController(IConsumerManager<IntegrationMessage> consumerManager) : base(consumerManager)
    {
    }
}

Note: We will need a controller for integration messages and another for domain messages.

 

2.5 - Processing messages

We've seen how to publish and consume messages, and now we need to see how to process them.

A quick (and bad) solution would just be an endless list of if statements somewhere in our code. But if we did this, we couldn't build a library since those ifs would be located in the core of the library (technically, we could handle this with a delegate, but that's just as bad as the ifs).

 

So how do we do it?

It's simple: we'll create what are called Handlers. By using reflection, we'll identify, based on the received message, which handler to use, and execute the message against that handler.

public class HandleMessage  : IHandleMessage
{
    private readonly IMessageHandlerRegistry _messageHandlerRegistry;

    public HandleMessage(IMessageHandlerRegistry messageHandlerRegistry)
    {
        _messageHandlerRegistry = messageHandlerRegistry;
    }

    public Task Handle(IMessage message, CancellationToken cancellationToken = default)
    {
        if (message == null) throw new ArgumentNullException(nameof(message));

        Type messageType = message.GetType();
        var handlerType = typeof(IMessageHandler<>).MakeGenericType(messageType);
        List<IMessageHandler> handlers = _messageHandlerRegistry.GetMessageHandlerForType(handlerType, messageType).ToList();

        foreach (IMessageHandler handler in handlers)
        {
            Type messageHandlerType = handler.GetType();
            
            MethodInfo? handle = messageHandlerType.GetMethods()
                .Where(methodInfo => methodInfo.Name == nameof(IMessageHandler<object>.Handle))
                .FirstOrDefault(info => info.GetParameters()
                    .Select(parameter => parameter.ParameterType)
                    .Contains(message.GetType()));
            
            if (handle != null) 
                return  (Task) handle.Invoke(handler, new object[] {message, cancellationToken})!;
        }
        return  Task.CompletedTask;
    }
}

The IMessageHandlerRegistry interface contains a list of the available handlers in the system.

 

Now we need to define the interfaces for both handlers, those for integration messages and those for domain messages.

public interface IMessageHandler
{
}

public interface IMessageHandler<in TMessage> : IMessageHandler
{
    Task Handle(TMessage message, CancellationToken cancelToken = default(CancellationToken));
}

public interface IIntegrationMessageHandler : IMessageHandler
{
}

public interface IIntegrationMessageHandler<TMessage> 
    : IMessageHandler<IntegrationMessage<TMessage>>, IIntegrationMessageHandler
{
}

public interface IDomainMessageHandler : IMessageHandler
{
}

public interface IDomainMessageHandler<TMessage> 
    : IMessageHandler<IntegrationMessage<TMessage>>, IDomainMessageHandler
{
}

 

What does this allow us to do?

Very simple: in our application, we can create handlers individually, like the following:

public class SubscriptionHandler : IIntegrationMessageHandler<SubscriptionDto>
{
    public Task Handle(IntegrationMessage<SubscriptionDto> message, CancellationToken cancelToken = default(CancellationToken))
    {
       Console.WriteLine($"Email {message.Content.Email} successfully subscribed.");
       return Task.CompletedTask;
    }
}

 

It will be executed only when we consume a message of the specified type.

 

Conclusion

  • In this post, we have seen what the producer consumer pattern is and the options we have for creating asynchronous communication.
  • We have also seen an abstraction of this pattern written in C# to help us in the future.

In the next post, we will look at RabbitMQ and its implementation.

 

This post was translated from Spanish. You can see the original one here.
If there is any problem you can add a comment bellow or contact me in the website's contact form

Uso del bloqueador de anuncios adblock

Hola!

Primero de todo bienvenido a la web de NetMentor donde podrás aprender programación en C# y .NET desde un nivel de principiante hasta más avanzado.


Yo entiendo que utilices un bloqueador de anuncios como AdBlock, Ublock o el propio navegador Brave. Pero te tengo que pedir por favor que desactives el bloqueador para esta web.


Intento personalmente no poner mucha publicidad, la justa para pagar el servidor y por supuesto que no sea intrusiva; Si pese a ello piensas que es intrusiva siempre me puedes escribir por privado o por Twitter a @NetMentorTW.


Si ya lo has desactivado, por favor recarga la página.


Un saludo y muchas gracias por tu colaboración

© copyright 2025 NetMentor | Todos los derechos reservados | RSS Feed

Buy me a coffee Invitame a un café