When we begin developing distributed systems and search the internet for information, a whole bunch of new terms start appearing. One of them is CQRS, which we are going to review today.
Table of Contents
1 - What is the CQRS Pattern?
CQRS is a software design pattern that shows us how to separate our application's logic for reading and writing operations.
This applies to the code or the execution of the program, as well as the physical location of the data.
The initials CQRS come from English and mean "Command Query Responsibility Segregation"; which refers to separating the responsibility of queries (reads) and commands (writes/updates).
The CQRS pattern was not seen much before the arrival of microservices, since it was common to have a monolith that did everything, including connecting to a single database. However, in today's world, distributed architectures are gaining popularity.
1.1 - CQRS Advantages
Using the CQRS pattern brings many advantages, but let's focus mainly on scalability independently.
This means you can scale a specific part of the application. Let's see an example.
If you have products, the commands will be to create and update products, while the queries will be to read those products.
If we think about it for a second, when we browse websites, we're constantly going in and out of products. You enter a store and check out 45 products, discard half, and revisit them all for your research.
And you keep doing that until you finally purchase a product.
During this selection process, we've done a ton of reads but only one modification, and that's only if we actually buy something.
If we scale this scenario to thousands or even millions of users in our web store, we can quickly see the benefits of implementing CQRS.
So if we notice our website slowing down, we can always add resources to the server handling the reads or increase the number of pods in Kubernetes behind an API Gateway, and so on.
Here are four key points that will affect the implementation of the pattern:
- Using CQRS is often tied to
Event Sourcing
in the write database. But we'll coverEvent Sourcing
in another post, since they're different patterns, even though they're often explained together (alongside DDD)- Note: the write/read databases can be physically separated or not, depending on your use case/load.
- You need to synchronize the write database with the read database to prevent inconsistencies. For this, you should use a service bus.
- If your project is small or you expect about the same number of reads as writes, all logic is usually in the same codebase using
MediatR
(in c#), serving as an in-memory service bus, since managing multiple microservices adds extra complexity and may not always be worthwhile. - On the read side, you have two options: maintain table copies or have the data pre-built for what the user needs, so when you perform a "get", it's almost an instant ID lookup.
The choice between the above scenarios, such as using the get pattern for views or splitting into two applications, depends exclusively on your use case and your company. Both solutions have benefits and drawbacks.
My personal recommendation is to write clean and well-structured code. This way, if you ever need to switch approaches because your company grows or doesn’t reach expectations, migration will be easy.
2 - Implementing CQRS
When implementing CQRS and separating applications, there is no reason why you have to use the same kind of databases. Can you? Of course, but nothing forces you to do so.
In fact, it's very common (as in our product example) to use a relational database for writes and a non-relational (NoSQL) one for reads.
Another common example, if you work entirely with MongoDb
, is to have the write database as the "master" and read from "replicas" (replica sets).
For example, if you have products with descriptions, price, and stock, these are three elements that "have nothing to do with each other."
What I mean is the product description won't change when you update the price, and updating the price won't change the product.
These are clearly different entities (and that’s not even considering maintaining a history), so they shouldn’t be in the same table. And possibly, they should be in different services too.
But when we query the product, we want to see all that information, so we aggregate it into a single row in the NoSQL database.
- Note: I’ll publish a post/video about the differences between SQL and NoSQL in the future.
If you didn’t do it this way, you’d have two options for getting all that info: a query with lots of inner joins or many API calls within your backend, or many calls to different APIs from your front end.
This is another spot where there’s no single right answer—depending on the use case, sometimes a table acting as a "view" is best, and other times, many API calls are preferable.
Before wrapping up this part, note that in the previous image there's no "save" button. That's because commands represent single actions the user can take. As you see on modern websites, when you edit information, you usually don’t have to scroll to the bottom to save—the changes update instantly, either automatically or through a modal dialog.
The idea is the same: we only edit the needed information.
This way, your API becomes an intermediary layer between the frontend/client and the action to execute.
2.1 - Setting Up the Infrastructure
As always, this code is available on GitHub in the Distribt project.
- Note: to simplify, in this example there is only a product microservice. If I get feedback or see confusion, I’ll add the other two.
To better reflect a real-world production application, I’m splitting the read and write APIs. This means creating two new projects, and also a consumer, which will consume events we send to the Service bus
.
I’ll also add an extra layer for business logic. This layer could be split into several projects, but for this example, it’s not necessary.
As always, all this needs to be configured in the services we’ve seen so far. In our case, just add the information to Consul:
docker exec -it consul consul services register -name=ProductsApiWrite -address=http://localhost -port=50320docker exec -it consul consul services register -name=ProductsApiRead -address=http://localhost -port=50321
Now, I’ll add both MySql and MongoDb to our docker-compose file. MySql is for write operations, and MongoDb is for reading.
mongodb: image: mongo:latest container_name: MongoDb ports: - 27017:27017 environment: - MONGO_INITDB_ROOT_USERNAME=distribtUser - MONGO_INITDB_ROOT_PASSWORD=distribtPassword - MONGO_INITDB_DATABASE=distribt volumes: - ./tools/mongodb/mongo-init.js:/docker-entrypoint-initdb.d/mongo-init.js:romysql: image: mysql:8.0 container_name: MySql environment: MYSQL_DATABASE: 'distribt' MYSQL_USER: 'distribtUser' MYSQL_PASSWORD: 'distribtPassword' MYSQL_ROOT_PASSWORD: 'distribtRootPassword' ports: - 3306:3306 volumes: - ./tools/mysql/init.sql:/docker-entrypoint-initdb.d/init.sql
Here’s the content of the mongo (mongo-init.js
) file:
db.createUser({ user: "distribtUser", pwd: "distribtPassword", roles: [{ role: "readWrite", db: "distribt" } ], mechanisms: ["SCRAM-SHA-1"]});db.createCollection("Products");db.Products.insertOne({"Id": 1, "Details": {"Name": "Producto 1", "Description": "The description says this is the first product"}, "Stock": 32, "Price": 10 });db.Products.insertOne({"Id": 2, "Details": {"Name": "Second product", "Description": "This is product number 2"}, "Stock": 3, "Price": 120 });db.Products.insertOne({"Id": 3, "Details": {"Name": "Third", "Description": "Third Times Are Never Good"}, "Stock": 10, "Price": 15 });
And for MySql (init.sql
):
USE `distribt`;CREATE TABLE `Products` ( `Id` int NOT NULL AUTO_INCREMENT, `Name` VARCHAR(150) NOT NULL, `Description` VARCHAR(150) NOT NULL, PRIMARY KEY (`Id`)) AUTO_INCREMENT = 1;INSERT INTO `distribt`.`Products` (`Id`, `Name`, `Description`) VALUES ('1', 'Producto 1', 'The description says this is the first product');INSERT INTO `distribt`.`Products` (`Id`, `Name`, `Description`) VALUES ('2', 'Second product', 'This is product number 2');INSERT INTO `distribt`.`Products` (`Id`, `Name`, `Description`) VALUES ('2', 'Third', 'Third Times Are Never Good');
As you can see, both databases have a couple of sample records so you can try the example.
And so, the configurations for both databases will go into Vault and Consul:
# VAULT## User&Pass for mongoDbdocker exec -it vault vault kv put secret/mongodb username=distribtUser password=distribtPassword##User&Pass for MySqldocker exec -it vault vault kv put secret/mysql username=distribtUser password=distribtPassword# CONSULdocker exec -it consul consul services register -name=MySql -address=localhost -port=3307docker exec -it consul consul services register -name=MongoDb -address=localhost -port=27017
Although in the case of Vault you could add it as an engine.
By now, we've built all the infrastructure needed to store information.
And if you remember from the post about the producer consumer pattern, when we insert something to the write database, we create a domain message so the handler can read it.
To create that domain message, we need to create a service bus in RabbitMQ
: create both the internal queue for domain messages and the external one for other services to access the updated info.
"queues": [ {"name":"product-queue","vhost":"/","durable":true,"auto_delete":false,"arguments":{}}, {"name":"product-queue.dead-letter","vhost":"/","durable":true,"auto_delete":false,"arguments":{}}, {"name":"product-domain-queue","vhost":"/","durable":true,"auto_delete":false,"arguments":{}}, {"name":"product-domain-queue.dead-letter","vhost":"/","durable":true,"auto_delete":false,"arguments":{}}],"exchanges": [ {"name":"products.exchange","vhost":"/","type":"topic","durable":true,"auto_delete":false,"internal":false,"arguments":{}},],"bindings": [ {"source":"products.exchange","vhost":"/","destination":"product-queue","destination_type":"queue","routing_key":"external","arguments":{}}, {"source":"products.exchange","vhost":"/","destination":"product-domain-queue","destination_type":"queue","routing_key":"internal","arguments":{}}]
In the code, we’ve also created the product-queue
, but we're not using it in this example.
- Note: This is where
MediatR
comes in—if you have everything in the same app.
3 - CQRS Implementation in C#
The code itself is quite simple: we create the APIs and their corresponding endpoints.
One for creating and one for updating in the Write API:
[HttpPost(Name = "addproduct")]public Task<IActionResult> AddProduct(CreateProductRequest createProductRequest){ throw new NotImplementedException();}[HttpPut("updateproductdetails/{id}")]public Task<IActionResult> UpdateProductDetails(int id, ProductDetails productDetails){ throw new NotImplementedException();}
As you can see, Create
takes a CreateProductRequest
that includes all the data: stock
and price
, while update
is just for details, that is, name and description.
public record CreateProductRequest(ProductDetails Details, int Stock, decimal Price);public record ProductDetails(string Name, string Description);public record FullProductResponse(int Id, ProductDetails Details, int Stock, decimal Price);public record ProductUpdated(int ProductId, ProductDetails Details);public record ProductCreated(int Id, CreateProductRequest ProductRequest);
- As previously mentioned, depending on the orchestration strategy, you’ll need to configure the application accordingly.
For reading products (Read API):
[HttpGet("{productId}")]public Task<IActionResult> GetProduct(int productId){ throw new NotImplementedException();}
Command/Query in Type Names
A common debate when discussing CQRS is whether or not to embed "command" or "query" into your type names. For example, a product query type could be called ProductQuery
and a command for inserting a product CreateProductCommand
.
This discussion often leads to a lot of team arguments and I'll share my opinion: I don’t care if you use Query or Command or nothing, as long as the name is clear and easily understood.
Back to the Code...
Let’s modify the write-side controllers and create a use case:
[ApiController][Route("[controller]")]public class ProductController{ private readonly IUpdateProductDetails _updateProductDetails; private readonly ICreateProductDetails _createProductDetails; public ProductController(IUpdateProductDetails updateProductDetails, ICreateProductDetails createProductDetails) { _updateProductDetails = updateProductDetails; _createProductDetails = createProductDetails; } [HttpPost(Name = "addproduct")] public async Task<IActionResult> AddProduct(CreateProductRequest createProductRequest) { CreateProductResponse result = await _createProductDetails.Execute(createProductRequest); return new CreatedResult(new Uri(result.Url), null); } [HttpPut("updateproductdetails/{id}")] public async Task<IActionResult> UpdateProductDetails(int id, ProductDetails productDetails) { bool result = await _updateProductDetails.Execute(id, productDetails); return new OkResult(); }}
- Note: This post only shows the update use case, the GitHub repo has the rest.
public interface IUpdateProductDetails{ Task<bool> Execute(int id, ProductDetails productDetails);}public class UpdateProductDetails : IUpdateProductDetails{ private readonly IProductsWriteStore _writeStore; private readonly IDomainMessagePublisher _domainMessagePublisher; public UpdateProductDetails(IProductsWriteStore writeStore, IDomainMessagePublisher domainMessagePublisher) { _writeStore = writeStore; _domainMessagePublisher = domainMessagePublisher; } public async Task<bool> Execute(int id, ProductDetails productDetails) { await _writeStore.UpdateProduct(id, productDetails); await _domainMessagePublisher.Publish(new ProductUpdated(id, productDetails), routingKey: "internal"); return true; }}
At this point, our message is in the service bus.
With this code, we're halfway done. All that’s left is consuming the event to update the read database and enable data retrieval.
For that, we create the handler that updates the info. In our case, it also creates an integration message.
public class ProductUpdatedHandler : IDomainMessageHandler<ProductUpdated>{ private readonly IProductsReadStore _readStore; private readonly IIntegrationMessagePublisher _integrationMessagePublisher; public ProductUpdatedHandler(IProductsReadStore readStore, IIntegrationMessagePublisher integrationMessagePublisher) { _readStore = readStore; _integrationMessagePublisher = integrationMessagePublisher; } public async Task Handle(DomainMessage<ProductUpdated> message, CancellationToken cancelToken = default(CancellationToken)) { await _readStore.UpsertProductViewDetails(message.Content.ProductId, message.Content.Details, cancelToken); await _integrationMessagePublisher.Publish( new ProductUpdated(message.Content.ProductId, message.Content.Details), routingKey:"external", cancellationToken: cancelToken); }}
The logic remains the same: the event is the trigger that causes the command to be executed.
Now, we have the value updated in the read database:
And finally, we need to create the simple get
. You could even implement this with a minimal API, as there is no real logic here:
app.MapGet("product/{productId}", async ( int productId, IProductsReadStore readStore) => await readStore.GetFullProduct(productId));
You can see how the element has been updated.
Conclusion
In this post, we’ve quickly and clearly covered what CQRS is.
We’ve seen the main characteristics of CQRS.
And how to implement CQRS in C#.
If there is any problem you can add a comment bellow or contact me in the website's contact form