Today I'm here to talk about a type in C# that most of you probably don't know; in fact, I didn't know it myself until a couple of years ago when I joined my current client. But explaining it now is perfect since it's related to what recently happened with the MediatR library since it can replace part of its functionality.
Index
In today's post, we'll take a look at the Channels type in C#.
1 - What is Channels in C#?
In C# there's a very specific type based on the concept of sending messages or events through a channel, which means that one part of the system creates an event and the other part receives it. These events can be of any type and are stored in the channel until the receiver consumes them.
If you think about it, this is a logic very similar to the producer consumer pattern, but in memory.
NOTE: If you're not familiar with the producer consumer pattern, I recommend reading the following post first: Pattern post Producer-Consumer.
But to sum up, it's a pattern in which one part of the system generates an event and another part consumes it. Both parts have no knowledge or information about each other and technically (even if not required for the system to work correctly) it doesn't matter who generates or consumes the event, consumers don't care who generates it, and producers don't care who consumes it. If you're used to distributed systems, it's like a message queue.
Channels let us implement this functionality, acting as a message queue if you're used to distributed systems in memory, with no extra infrastructure. This allows both sides of the system to be decoupled since there's no direct communication, and events can be processed asynchronously.
2 - What does this have to do with MediatR?
Some of you might be wondering what this has to do with using MediatR, since from what we've seen so far, channels have nothing to do with the mediator pattern. And that's true, but the MediatR library doesn't just implement the mediator pattern: it also implements notifications by its INotificationHandler<T>
, and this is where channels come in.
For this example, let's use the same scenario from the post about the mediator pattern, where an API endpoint receives a request to update a product through an endpoint which calls a handler that performs the logic and ends up creating a notification via MediatR. This notification is listened to by another part of the system, which then sends an email to all users who are
If you read that post, or the one I did about Core-Driven Architecture, you know I'm not a huge fan of using MediatR, but I've always found notifications to be really useful to decouple different parts of a system.
So here we're going to see how to remove MediatR from our system by replacing it completely with features already available in .NET.
For the mediator pattern for use cases, or handlers, we'll remove them entirely and just directly invoke the code. You might think this couples the code, and technically that's correct, but the code is logically coupled anyway, so the difference is basically zero.
If you use the MediatR pipeline with different behaviours, you can always replace them with middlewares and filters; the function is basically the same.
For notifications, we’ll use channels, and in the next point we’ll see how to set it up.
Which results in the following diagram:
3 - Implementing the producer consumer pattern with Channels
As always for this example, the code is on GitHub, but my aim is that it’s easy to follow.
First, let’s mention that we have an input DTO, which is ItemDto
public record ItemDto(int Id, decimal Price, string Title);
NOTE: In this example, to save time, it's what I'll insert into the data layer, but in a real application, it should be an entity, since they are different types.
A very simple endpoint, which just calls the use case to update the product:
app.MapPut("items", async (UpdateItem updateItem, ItemDto itemDto) =>
{
return await updateItem.Execute(itemDto);
});
And for now, the use case where we read from the database and update the value:
public class UpdateItem (IDatabaseRepository databaseRepository)
{
public async Task<bool> Execute(ItemDto itemToUpdate)
{
if (itemToUpdate.Title.Length > 200)
throw new Exception("Title must be less than 200 characters");
if (itemToUpdate.Price <= 0)
throw new Exception("It can't be free");
await databaseRepository.UpdateItem(itemToUpdate.Id, itemToUpdate.Price, itemToUpdate.Title);
return true;
}
}
Note: The database layer is not implemented, this is just to show how it works.
3.1 - Generating events with Channels
Let's imagine a situation where every time a product is updated and its price is 30% lower than before, we want to generate a notification, maybe an email to customers like Steam does, or a push notification, it doesn’t matter. What matters for us is that we have to generate a notification and obviously we want this to happen asynchronously, we don't want to be waiting for that process to finish before we can continue.
For this, in the MediatR post we saw that we could use the INotification type, though there are other solutions such as using an app like hangfire.
But in our current scenario, what we have to do is import using System.Threading.Channels
and add the channel in the use case constructor and use it to write to the channel:
public class UpdateItem (IDatabaseRepository databaseRepository,
Channel<ItemUpdated> channel 👈
)
{
public async Task<bool> Execute(ItemDto itemToUpdate)
{
if (itemToUpdate.Title.Length > 200)
throw new Exception("Title must be less than 200 characters");
if (itemToUpdate.Price <= 0)
throw new Exception("It can't be free");
ItemDto originalItem = await databaseRepository.GetItemById(itemToUpdate.Id);
await databaseRepository.UpdateItem(itemToUpdate.Id, itemToUpdate.Price, itemToUpdate.Title);
decimal percentageDifference = ((itemToUpdate.Price - originalItem.Price) / originalItem.Price) * 100;
if (percentageDifference <= -30)
{
await channel.Writer.WriteAsync(new ItemUpdated() 👈
{
Id = itemToUpdate.Id,
NewPrice = itemToUpdate.Price,
NewTitle = itemToUpdate.Title,
OldPrice = originalItem.Price,
OldTitle = originalItem.Title
});
}
return true;
}
}
And don't forget to add it to the dependency container. Here, it's not just about adding it, but you have two options: create bounded or unbounded channels.
The difference is simple: bounded allows you to set a maximum number of items in the channel, while unbounded has no limit, except for the system memory.
Generally, I configure them as unbounded, but if you use limits, you also have to configure what to do when the channel is full. There are several options.
Wait
: wait until there's space.DropWrite
: stop writing, the message you're trying to write is discarded.DropOldest
: remove the oldest message and add the new one.DropNewest
: remove the newest message and add the new one.
Depending on the type of system you're building, some options will fit your needs, or simply leave it unlimited if you're not going to have memory problems, which nowadays is quite common.
This would be the implementation for a Channel with a capacity of 100 items where we wait for previous events to release space:
BoundedChannelOptions options = new BoundedChannelOptions(100) {
FullMode = BoundedChannelFullMode.Wait
};
builder.Services.AddSingleton(Channel.CreateBounded<ItemUpdated>(options));
3.2 - Consuming events with Channels
Finally, we just need to consume the event. This is done with a background task, in other words, a native .NET BackgroundService
public class ItemUpdatedWorker(Channel<ItemUpdated> channel) : BackgroundService
{
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (await channel.Reader.WaitToReadAsync(stoppingToken))
{
while (channel.Reader.TryRead(out ItemUpdated? item))
{
Console.WriteLine($"Item {item.Id} has been updated. Old price: {item.OldPrice}, " +
$"New price: {item.NewPrice}. Old title: {item.OldTitle}, " +
$"New title: {item.NewTitle}");
}
}
}
}
Don't forget to add it to the dependency container:
builder.Services.AddHostedService<ItemUpdatedWorker>();
As you can see, we just read from the channel, in this example I'm printing the values to the console, but this background worker could do any necessary action in the background.
If there is any problem you can add a comment bellow or contact me in the website's contact form