Producer Consumer Pattern

One of the main patterns in distributed systems is the producer consumer pattern (producer/consumer), which is used for asynchronous communication between microservices.

 

 

Note: This post is quite long, but it's worth it if you want to implement the producer consumer pattern. Remember that all this code is working at GitHub, with the implementation using RabbitMQ, which we will see in the next post.

 

 

1 - What is the producer consumer pattern?

The producer consumer pattern (producers/consumers) is a design pattern that designates processes to be producers of resources—in our case, messages—or consumers of them.

 

When talking about software, we use producer consumer in asynchronous communication, and it is usually done through a bus or queues to store those resources temporarily.

asynchronous communication

We use the producer consumer pattern, for example, when we subscribe to a website, a newsletter, etc., where the service sends us an email.

 

For instance, you enter your email, and as soon as you click "subscribe," the server responds, "Thank you for subscribing," but two minutes later, you receive the email confirming your subscription.

This is because the producer consumer pattern is being used.

From the user's perspective, by clicking the button, their email has been registered and the message arrived in their inbox. But the process isn't exactly like that.

When you click, the software creates a message with the email and publishes it to our message broker, and immediately responds to the user that everything is okay.

 

But processing the message is still pending. For that, there's another system that will consume the message and send the confirmation email.

producers consumers use case

This is just one of many examples you'll find online. When you purchase products on many websites (such as Amazon), it works in the same way: it gives you a purchase ID and later confirms that everything is fine. But if something fails, it also notifies you.

 

1.1 - Message Bus

To implement the producer consumer logic, we typically use a message bus, also known as a service bus.

 

This is because we can have multiple consumers for the same bus, and none of the applications are aware of which app is producing the messages or which other apps are listening to the bus.

service bus

An important point to keep in mind is that there's no guarantee that messages will be consumed in order. This means that if two messages are generated, we may have several consumers and they might not receive them in the same order.

 

Depending on which software you use for the service bus, it may happen that if there are no consumers, the message is lost (RabbitMQ), while other software stores these messages (Kafka).

Note: In RabbitMQ you can store messages if you bind them directly to a queue in RabbitMQ.

 

1.2 - Message Queues

The other option when we use asynchronous communication is to create message queues.

And even though it follows the same idea as the message bus, when using queues we'll have a single client of the queue (one consumer per queue).

 

And unlike message buses, a message never leaves the queue (unless it has an expiration time), so you need storage (disk) to use them.

message queues

 

1.2.1 - Dead letter Queue

When processing messages, we hope everything goes well and works, but what happens when something goes wrong? Whether it's a bug or a part of the system is down.

 

Well, that's where dead letter queues come in. In our use case, we complete the message as long as the message is sent. If it's not, we'll have an error stating the reason, but what happens with the message?

 

Most commonly, the message goes back to the queue and ends up running in an infinite loop. As you can imagine, infinite loops are not a good idea. That's why, after X number of attempts, we should move the message to another queue, where it will be stored until another process (or manually) checks why that message failed, then fixes the bug if there is one, and puts the message back into the queue where it should have been handled.

dead letter queue explanation

 

2 - Abstraction of the Producer Consumer Pattern

The first step to achieve efficient code is to abstract the pattern you are going to use. Not the implementation of the technology to use (RabbitMQ / Kafka), which we'll do later, but the pattern itself.

For that, I've created a small library inside Distribt.Shared.Communication, which will allow us to implement the communication technology we want later on.

 

2.1 - Messages

The first thing we need to look at is what kind of message we're going to send.

In many architectures, we distinguish between integration messages (IntegrationMessages) and domain messages (DomainMessages).

Mainly, we'll do this for a logical separation and to make our application easier to understand.

 

Another reason is that integration messages are generated from your domain so that other services can consume them, whereas domain messages are those your domain produces that the domain itself is going to consume.

 

A simple way to understand this concept is with an example: when we separate read and write databases.

Let's say we have a products database, split into two, read and write. When we do "Get product," we always fetch it from read. While when we write information, we do so in write.

This could show a flow like the following:

reader writer architecture

In this scenario, we change the name of a product in the write database.

From there, we generate a domain event and publish it to the message bus, which is consumed by the handler, which in turn updates the item in the read database and generates the integration event "product name changed," which will be consumed by all services that have a dependency on that message.

 

2.2 - Domain Messages and Integration Messages

Now let's explain the messages themselves. For this library, we have both IntegrationMessages and DomainMessages. For this example, both are the same, but if we implement DDD in the future, we could add properties like AggregateId to DomainMessage.

 

And this is where I wanted to get: for the messages, we're going to use composition over inheritance.

The reason we do this is based on my personal experience. It's most common to go into companies and see them using inheritance for messages, both domain and integration messages.

 

The problem is that doing this is, in most cases, a mess, as the message needs to have many more elements (properties) than necessary, elements that have to be added and maintained by the developers working on the microservices:

public class DomainMessage {    public string MessageIdentifier { get;set; }    public string CorrelationId { get; set; }    public string MessageName { get; set; }    public DateTime CreatedTimeUtc { get; set; } }public class SubscriptionDomainMessage : DomainMessage{    public string Email { get; set; }}

Due to all this management, messages end up with half of their properties holding null values, or being assigned at random places in the app, being incorrect, and so on, because basically it's a nightmare—every time the library changes, everyone has to update, and in practice it never happens.

 

So, for that reason, we’ll go another route: we'll allow the user to focus only on the message they want to send and its content.

In this case, subscription. In fact, we don't need to call it subscriptionDomainMessage—simple SubscriptionDto works.

public class SubscriptionDto{    public string Email { get; set; }}

But for this, we need to add some complexity to our library—honestly, not that much—but it really pays off in the long run.

First, we'll logically separate what we need. For example, we know we'll use correlationId in the future for observability and keeping information on when the message was created can also be valuable. That information will be placed in a class called Metadata.

public record Metadata{    public string CorrelationId { get; }    public DateTime CreatedUtc { get; }    public Metadata(string correlationId, DateTime createdUtc)    {        CorrelationId = correlationId;        CreatedUtc = createdUtc;    }}

 

We'll apply similar logic to "what should the message contain"—basic information, not "metadata," but closely related to a message. For this, we'll create an interface called IMessage with this information.

Additionally, we'll use that class in the future to identify whether a message is a domain or integration message.

public interface IMessage{    public string MessageIdentifier { get; }    public string Name { get; }}

 

And we'll create DomainMessage and IntegrationMessage based on this:

public record DomainMessage : IMessage{    public string MessageIdentifier { get; }    public string Name { get; }    public DomainMessage(string messageIdentifier, string name)    {        MessageIdentifier = messageIdentifier;        Name = name;    }}public record IntegrationMessage : IMessage{    public string MessageIdentifier { get; }    public string Name { get; }    public IntegrationMessage(string messageIdentifier, string name)    {        MessageIdentifier = messageIdentifier;        Name = name;    }}

Remember, if you want to add any extra info, such as AggregateId, you can do so inside DomainMessage / IntegrationMessage.

 

Finally, all that's left is putting everything together and adding the main item: the message the user wants to send. We create these classes with a generic type, which is the type the user will send:

public record IntegrationMessage<T> : IntegrationMessage{    public T Content { get; }    public Metadata Metadata { get; }    public IntegrationMessage(string messageIdentifier, string name, T content, Metadata metadata)        : base(messageIdentifier, name)    {        Content = content;        Metadata = metadata;    }}public record DomainMessage<T> : DomainMessage{    public T Content { get; }    public Metadata Metadata { get; }    public DomainMessage(string messageIdentifier, string name, T content, Metadata metadata)        : base(messageIdentifier, name)    {        Content = content;        Metadata = metadata;    }}  

With all this, the user/developer only needs to take care of T, as the rest will be handled by the library.

Now, all that's left is sending and receiving them.

 

2.3 - Publishing Messages

Most of the logic for sending messages happens in the abstraction we create for the service we use, whether it's Kafka, Mosquitto, or, as in our case, RabbitMQ. Yet we need something in this project.

 

We’ll create an interface that will be implemented in that abstraction, a method to publish a single message and one to publish a list.

public interface IExternalMessagePublisher<in TMessage>    where TMessage : IMessage{    Task Publish(TMessage message, string? routingKey = null, CancellationToken cancellationToken = default);    Task PublishMany(IEnumerable<TMessage> messages, string? routingKey = null, CancellationToken cancellationToken = default);}

 

As you can see, we're sending TMessage, which will either be IntegrationMessage<T> or DomainMessage<T>, plus RoutingKey, which is a property set in the message header to identify message types, and so on.

We'll see more about RoutingKey in the RabbitMQ implementation.

Note: Code examples are only with integration messages, but for sending domain messages we do the same.

 

If that interface is what the abstraction uses, what do developers use when they want to send a message?

 

The following interface:

public interface IIntegrationMessagePublisher{    Task Publish(object message, Metadata? metadata = null, string? routingKey = null, CancellationToken cancellationToken = default);    Task PublishMany(IEnumerable<object> messages, Metadata? metadata = null, string? routingKey = null, CancellationToken cancellationToken = default);}

With two methods, similar to the previous interface.

 

Now, let's put it all together. We create a class that implements our IIntegrationMessagePublisher interface and runs IExternalMessagePublisher<IntegrationMessage>:

public class DefaultIntegrationMessagePublisher : IIntegrationMessagePublisher{    private readonly IExternalMessagePublisher<IntegrationMessage> _externalPublisher;    public DefaultIntegrationMessagePublisher(IExternalMessagePublisher<IntegrationMessage> externalPublisher)    {        _externalPublisher = externalPublisher;    }    public Task Publish(object message, Metadata? metadata = null, string? routingKey = null, CancellationToken cancellationToken = default)    {        Metadata calculatedMetadata = CalculateMetadata(metadata);        var integrationMessage = IntegrationMessageMapper.MapToMessage(message, calculatedMetadata);        return _externalPublisher.Publish(integrationMessage, routingKey, cancellationToken);    }    public Task PublishMany(IEnumerable<object> messages, Metadata? metadata = null, string? routingKey = null, CancellationToken cancellationToken = default)    {        var integrationMessages =            messages.Select(a => IntegrationMessageMapper.MapToMessage(a, CalculateMetadata(metadata)));        return _externalPublisher.PublishMany(integrationMessages, routingKey, cancellationToken);    }    private Metadata CalculateMetadata(Metadata? metadata)    {        return metadata ?? new Metadata(Guid.NewGuid().ToString(), DateTime.UtcNow);    }}

 

Finally, we just need to include it in the dependency container.

public static void AddPublisher<TMessage>(this IServiceCollection serviceCollection){    if (typeof(TMessage) == typeof(IntegrationMessage))    {        serviceCollection.AddIntegrationBusPublisher();    }    else if (typeof(TMessage) == typeof(DomainMessage))    {        serviceCollection.AddDomainBusPublisher();    }}private static void AddIntegrationBusPublisher(this IServiceCollection serviceCollection){    serviceCollection.AddTransient<IIntegrationMessagePublisher, DefaultIntegrationMessagePublisher>();}private static void AddDomainBusPublisher(this IServiceCollection serviceCollection){    serviceCollection.AddTransient<IDomainMessagePublisher, DefaultDomainMessagePublisher>();}

 

Once included in the dependency container, we can inject it into any service and use it. For example, if we inject IIntegrationMessagePublisher _publisher, we can run the following code:

await publisher.Publish(subscriptionDto);

And it will send the subscriptionDto message wrapped in an IntegrationMessage<SubscriptionDto>.

 

2.4 - Consuming Messages

Consuming messages is somewhat more complex, as it requires not only consuming them, but also processing (handling) them.

We'll start as before, with the interface that will implement the abstraction of the service:

public interface IMessageConsumer{    Task StartAsync(CancellationToken cancelToken = default);}public interface IMessageConsumer<T> : IMessageConsumer{}

 

The IMessageConsumer<T> interface is what RabbitMQ will use.

 

But how are we going to consume these messages?

It's common to see console applications with nothing but code like _service.consume<T>() or similar. This has several issues—the first and obvious is that it only consumes one type of message, and the second is that it has no user interaction. If you want to stop the consumer, reset it, start from a particular date, etc., it's not possible.

 

In my case, I propose an alternative solution: use a WebAPI with a IHostedService.

Remember, IHostedService is an interface from Microsoft for running background tasks.

 

For this, we obviously need a hostedService, which will start, stop, and call IMessageConsumer.

public class ConsumerHostedService<TMessage> : IHostedService{    private readonly IConsumerManager<TMessage> _consumerManager;    private readonly IMessageConsumer<TMessage> _messageConsumer;    private readonly CancellationTokenSource _stoppingCancellationTokenSource =        new CancellationTokenSource();    private Task? _executingTask;    public ConsumerHostedService(IConsumerManager<TMessage> consumerManager, IMessageConsumer<TMessage> messageConsumer)    {        _consumerManager = consumerManager;        _messageConsumer = messageConsumer;    }    public Task StartAsync(CancellationToken cancellationToken)    {        _executingTask = ConsumeMessages(_stoppingCancellationTokenSource.Token);        return _executingTask.IsCompleted ? _executingTask : Task.CompletedTask;        }    public Task StopAsync(CancellationToken cancellationToken)    {        _stoppingCancellationTokenSource.Cancel();        _consumerManager.StopExecution();        return  Task.CompletedTask;    }    private async Task ConsumeMessages(CancellationToken cancellationToken)    {        while (!cancellationToken.IsCancellationRequested)        {            var ct = _consumerManager.GetCancellationToken();            if (ct.IsCancellationRequested) break;            try            {                await _messageConsumer.StartAsync(cancellationToken);            }catch (OperationCanceledException)            {                //the operation is getting cancelled, ignore.            }        }    }}

 

To control when to start, stop, etc., we use the IConsumerManager interface. But at this point, we can't do anything, so we have to create a controller to act as the "base" for this administration:

public class ConsumerController<TMessage> : Controller{    private readonly IConsumerManager<TMessage> _consumerManager;    public ConsumerController(IConsumerManager<TMessage> consumerManager)    {        _consumerManager = consumerManager;    }        [HttpPut]    [ProducesResponseType(StatusCodes.Status200OK)]    [Route("start")]    public virtual IActionResult Start()    {        _consumerManager.RestartExecution();        return Ok();    }} 

 

We add it to the dependency container:

public static void AddConsumer<TMessage>(this IServiceCollection serviceCollection){    serviceCollection.AddSingleton<IConsumerManager<TMessage>, ConsumerManager<TMessage>>();    serviceCollection.AddSingleton<IHostedService, ConsumerHostedService<TMessage>>();}

 

And voila! We only need to inherit in a controller from our ConsumerController for it to work.

[ApiController][Route("[controller]")]public class IntegrationConsumerController : ConsumerController<IntegrationMessage>{    public IntegrationConsumerController(IConsumerManager<IntegrationMessage> consumerManager) : base(consumerManager)    {    }}

Note: we'll need a controller for both integration messages and domain messages.

 

2.5 - Processing Messages

We've seen how to publish and consume messages; now we need to see how to process them.

A quick (and bad) solution would be to have an endless list of if statements somewhere in our code. But clearly, if we did that, we couldn't build a library because those ifs would be located at its core (technically, you could pass them with delegates, but that's just as bad as ifs).

 

So, how do we do it?

Simple—we're going to create what's known as Handlers and, using reflection, identify, based on the incoming message, which handler to use, and execute the message against that handler.

public class HandleMessage  : IHandleMessage{    private readonly IMessageHandlerRegistry _messageHandlerRegistry;    public HandleMessage(IMessageHandlerRegistry messageHandlerRegistry)    {        _messageHandlerRegistry = messageHandlerRegistry;    }    public Task Handle(IMessage message, CancellationToken cancellationToken = default)    {        if (message == null) throw new ArgumentNullException(nameof(message));        Type messageType = message.GetType();        var handlerType = typeof(IMessageHandler<>).MakeGenericType(messageType);        List<IMessageHandler> handlers = _messageHandlerRegistry.GetMessageHandlerForType(handlerType, messageType).ToList();        foreach (IMessageHandler handler in handlers)        {            Type messageHandlerType = handler.GetType();                        MethodInfo? handle = messageHandlerType.GetMethods()                .Where(methodInfo => methodInfo.Name == nameof(IMessageHandler<object>.Handle))                .FirstOrDefault(info => info.GetParameters()                    .Select(parameter => parameter.ParameterType)                    .Contains(message.GetType()));                        if (handle != null)                 return  (Task) handle.Invoke(handler, new object[] {message, cancellationToken})!;        }        return  Task.CompletedTask;    }}

The interface IMessageHandlerRegistry contains a list of all available handlers in the system.

 

Now we need to define the interfaces for both handlers: for integration messages and domain messages.

public interface IMessageHandler{}public interface IMessageHandler<in TMessage> : IMessageHandler{    Task Handle(TMessage message, CancellationToken cancelToken = default(CancellationToken));}public interface IIntegrationMessageHandler : IMessageHandler{}public interface IIntegrationMessageHandler<TMessage>     : IMessageHandler<IntegrationMessage<TMessage>>, IIntegrationMessageHandler{}public interface IDomainMessageHandler : IMessageHandler{}public interface IDomainMessageHandler<TMessage>     : IMessageHandler<IntegrationMessage<TMessage>>, IDomainMessageHandler{}

 

What does this code allow us to do?

Very simple: in our application we can create handlers individually, like the following:

public class SubscriptionHandler : IIntegrationMessageHandler<SubscriptionDto>{    public Task Handle(IntegrationMessage<SubscriptionDto> message, CancellationToken cancelToken = default(CancellationToken))    {       Console.WriteLine($"Email {message.Content.Email} successfully subscribed.");       return Task.CompletedTask;    }}

 

Which will be executed only when we consume a message of the specified type.

 

Conclusion

  • In this post, we've seen what the producer consumer pattern is and what options we have when creating asynchronous communication.
  • We've also seen an abstraction of this pattern written in C# that will help us in the future.

In the next post, we will look at RabbitMQ and its implementation.

 

This post was translated from Spanish. You can see the original one here.
If there is any problem you can add a comment bellow or contact me in the website's contact form

© copyright 2025 NetMentor | Todos los derechos reservados | RSS Feed

Buy me a coffee Invitame a un café