In this post, we’ll continue our Distribt course. I say continuation because CQRS (covered in the last post) and Event Sourcing
almost always go hand-in-hand, though not always.
Here, we'll dive into what Event Sourcing is in detail and how to implement it in C#.
- Note: Event Sourcing and DDD (Domain Driven Design) are often seen together as it’s very common to use
aggregates
in Event Sourcing, which are part of the DDD pattern.
Table of Contents
Of course, you always have access to the code on GitHub in the Distribt library.
1 - Current Situation
Before starting this new post, let’s see how our current system works. We have a database, which we update every time the value changes.
In our case, we have the endpoints addproduct
and updateproductdetails/{id}
which allow us to create and edit a product.
So, if we create a product with the name “item1” and the description “description1” and later update it, we’ll have the following scenario:
We have a table with a record, and when we update it, we update the value in that record — the old value disappears from the database.
And this works; it’s a normal database update, what’s referred to as the current state.
- It’s common to have logs, either in the database or in files, which indicate changes or even a copy of the entire table each time something changes.
2 - What is Event Sourcing
With event sourcing, what we do is have a database that will contain all the events that occur in our system.
By events, we mean the user actions. And when you combine them all, you reach the current state.
For example, if you have multiple events: the first is product creation, and then its updates.
So, we also reach the current state.
Technically, we could store just the event in the database, but it may be a good idea to store other information like the date, or (as we’ll see in the implementation) the order in which the events occur.
2.1 - Importance of Naming in Event Sourcing
I want to give special mention to event names — like with variables in programming, event names need to be self-explanatory and clear.
Imagine we have our Orders microservice (Distribt.Orders
). We could have an event called “OrderUpdated
” or several more descriptive events: one indicating a sale has been generated (“OrderCreated
”), one indicating the order has been paid (“OrderPaid
”), one for when the order has been sent from the warehouse (“OrderDispatched
”), and so on. The more granular, the clearer it will be.
Additionally, the first, “OrderUpdated
”, typically means sending all the information in every event, whereas having specific events allows us to send only the necessary information.
I say this from experience — I once joined a project with over 20 events called “created” or “updated”. Of course, the namespace was a clue, but it wasn’t intuitive.
It’s also important to minimize the amount of information we store. For instance, if we’re changing the order status, it doesn’t make sense to send the items or the shipping address.
2.2 - Source of Reliability in Event Sourcing
With this change, our events become actions that have happened in the system. That’s why we call events the “source of truth” — they contain everything that has happened in the system for a specific ID.
To store the events, we use what’s called an eventStore
; you can use any type of database, though usually it’s a document-based DB like MongoDB, DocumentDB, EventStoreDb, etc.
In this example, we'll use MongoDb
with a collection called “events
”.
If you're going to use Event Sourcing along with CQRS, remember this is only for the write database, and we'll use it to store a record of state.
2.3 - Aggregating Events
As you’ve seen, we’re creating events focused on a product. These events are all “linked”, and they’re linked in what we call an aggregate
. When working with Event Sourcing and you want to read the current state of an object (in this case, a product), you need to read all the elements in the eventStore referencing that object and reconstruct the aggregate with the final information.
You may wonder whether this is bad for the database, or if it impacts performance. The answer is yes, you’re loading more data than if you just had a single element or row.
But despite this, in practice, the difference is minimal. If it starts to take longer than is acceptable, you can always make snapshots of the data. That is, from day X on, the current state of each aggregate is the "new first event", and you can archive all previous data.
You’ll lose the full history (you can move it to cold storage), but you’ll gain speed. Personally, I’ve worked with Event Sourcing for years and never had an issue with this.
At the end of the day, there’s no one-size-fits-all solution — every company/team has priorities that define the final approach.
3 - When to Use the Event Sourcing Pattern
Given this additional complexity, you might wonder if it’s truly advantageous to use this pattern. Here are the main advantages; you’ll have to decide if it’s worth implementing. As with everything, there are contexts where it’s very beneficial, and others where it might not be. It’s also often tied to DDD.
So, we’ll use it whenever we can get these key benefits:
3.1 - Benefits of Using Event Sourcing
- Observability. Having all the events lets us identify why a record has a certain value: we know where, when, and why each record has been modified. This gives us confidence that the information is completely reliable.
- Fault Tolerance. To me, this is the main advantage of Event Sourcing: since we keep track of every event that happens in our system, we can verify the information in the read store. We can also experiment in the read store, and if something goes wrong, we simply replay all the events.
- Asynchronous Communication. Using Event Sourcing generally requires you to build APIs for your clients with this idea in mind, which can bring many benefits if you implement best practices.
- Auditing/logs. Although it’s not its main purpose, you can perfectly use it for action auditing. It’s a better alternative than a table just for logs.
4 - Implementing Event Sourcing in C#
To further expand the project content and show that different patterns can coexist in the same system, for Event Sourcing, we’ll work in the Distribt.Orders
microservice.
Until now, all we have is an API that returns fake data, which we are going to change. First, we need to create the connection between our code and the database.
4.1 - Persisting an Aggregate Using MongoDb
The code is quite extensive, so I’ll focus on the main parts. All the Event Sourcing logic is in the Distribt.Shared.EventSourcing
project, which is linked to MongoDB
. This is a personal decision; you could have picked any other database or event store to work with events.
To use that library, from the client application you should call webappBuilder.Services.AddEventSourcing(webappBuilder.Configuration);
When you specify this service, the dependency injection container will get the IEventStore
interface injected, which is used in the aggregate
. This interface communicates with MongoDb
.
A - The Aggregate
The aggregate is the type we’ll use to handle all the business logic of our object.
This Aggregate
type (also called AggregateRoot) contains a list of events
, the Id
, and the current "version
" — that is, the number of events.
Finally, there’s certain logic to know when a change comes from the database or is a new change to be stored.
namespace Distribt.Shared.EventSourcing;public class Aggregate{ private List<AggregateChange> _changes = new List<AggregateChange>(); public Guid Id { get; internal set; } private string AggregateType => GetType().Name; public int Version { get; set; } = 0; /// <summary> /// this flag is used to identify when an event is being loaded from the DB /// or when the event is being created as new /// </summary> private bool ReadingFromHistory { get; set; } = false; protected Aggregate(Guid id) { Id = id; } internal void Initialize(Guid id) { Id = id; _changes = new List<AggregateChange>(); } public List<AggregateChange> GetUncommittedChanges() { return _changes.Where(a=>a.IsNew).ToList(); } public void MarkChangesAsCommitted() { _changes.Clear(); } protected void ApplyChange<T>(T eventObject) { if (eventObject == null) throw new ArgumentException("you cannot pass a null object into the aggregate"); Version++; AggregateChange change = new AggregateChange( eventObject, Id, eventObject.GetType(), $"{Id}:{Version}", Version, ReadingFromHistory != true ); _changes.Add(change); } public void LoadFromHistory(IList<AggregateChange> history) { if (!history.Any()) { return; } ReadingFromHistory = true; foreach (var e in history) { ApplyChanges(e.Content); } ReadingFromHistory = false; Version = history.Last().Version; void ApplyChanges<T>(T eventObject) { this.AsDynamic()!.Apply(eventObject); } }}
When you create your own domain object, you must implement Aggregate for it to behave as an aggregate, as well as implement the IApply<T>
interface for each event that the object will work with.
public class OrderDetails : Aggregate, IApply<OrderCreated>, IApply<OrderPaid>, IApply<OrderDispatched>, IApply<OrderCompleted>{ //Code}
As we see in the example, we have the aggregate OrderDetails
and then an IApply<T>
for each of the previously shown events.
B - Saving the Aggregate in MongoDb
When storing information, we do it in a particular way — we don't just store the event but also information to identify or group it with others.
This info is what you see in the AggregateChange
and AggregateChangeDto
types, which contain data like:
Content
: for the object content (includes the event).AggregateId
: aggregate ID, in our case, the order ID.AggregateType
: to know the type.Version
: aggregate version; every new event adds 1.TransactionId
: combination of the Id and version.
Since we’re storing this information, we create an index in MongoDb with the Id, type, and version (this is done automatically in the library).
Inside the library, there’s a class called AggregateRepository<TAggregate>
that must be implemented by your repository.
The AggregateRepository<TAggregate>
type exposes two methods:
GetByIdAsync
: Reads from the database by ID and reconstructs the Aggregate in the correct order.SaveAsync
: Saves new events to the database.
This allows you to either inject IAggregateRepository<TAggregate>
into your services or create your own repository implementing AggregateRepository<TAggregate>
(my recommended option):
public interface IOrderRepository{ Task<OrderDetails> GetById(Guid id, CancellationToken cancellationToken = default(CancellationToken)); Task Save(OrderDetails orderDetails, CancellationToken cancellationToken = default(CancellationToken));}public class OrderRepository : AggregateRepository<OrderDetails>, IOrderRepository{ public OrderRepository(IEventStore eventStore) : base(eventStore) { } public async Task<OrderDetails> GetById(Guid id, CancellationToken cancellationToken = default(CancellationToken)) => await GetByIdAsync(id, cancellationToken); public async Task Save(OrderDetails orderDetails, CancellationToken cancellationToken = default(CancellationToken)) => await SaveAsync(orderDetails, cancellationToken);}
As you can see, the implementation is very straightforward.
Note: The current version has a bug/feature, so the events must be registered manually in the BsonSerializer during app initialization.
public static class MongoMapping{ public static void RegisterClasses() { //#22 find a way to register the classes automatically or avoid the registration BsonClassMap.RegisterClassMap<OrderCreated>(); BsonClassMap.RegisterClassMap<OrderPaid>(); BsonClassMap.RegisterClassMap<OrderDispatched>(); }}
If you can help find a solution, I’d be very grateful. Thanks!
Finally, you need to specify which database and collection to connect to using configuration in the appsettings
file, in the following section:
"EventSourcing": { "DatabaseName" : "distribt", "CollectionName" : "EventsOrders"},
4.2 - Creating an Aggregate
Now let’s move to our type’s logic, or how our type will change depending on the event.
Our use case is when we create an order and its possible modifications. So, we create the OrderDetails
type and add the information our Order will contain.
Of course, we specify the Aggregate
, and we’re required to have a constructor with a unique Id.
With this, we only have a default object and an Id. Now we need to start applying events that may happen to our object.
The first one is object creation:
public class OrderDetails : Aggregate, IApply<OrderCreated>{ public DeliveryDetails Delivery { get; private set; } = default!; public PaymentInformation PaymentInformation { get; private set; } = default!; public List<ProductQuantity> Products { get; private set; } = new List<ProductQuantity>(); public OrderStatus Status { get; private set; } public OrderDetails(Guid id) : base(id) { } public void Apply(OrderCreated ev) { Delivery = ev.Delivery; PaymentInformation = ev.PaymentInformation; Products = ev.Products; Status = OrderStatus.Created; ApplyChange(ev); }}
As you see, by implementing the interface, we create a method called Apply
that receives the event. Inside the method, we modify the object as needed and call ApplyChange
, which will store the event as new. When saving the aggregate through AggregateRepository
, it will detect it’s a new event and save it.
Now, we need to repeat the same action with the rest of the events:
public class OrderDetails : Aggregate, IApply<OrderCreated>, IApply<OrderPaid>, IApply<OrderDispatched>, IApply<OrderCompleted>{ public DeliveryDetails Delivery { get; private set; } = default!; public PaymentInformation PaymentInformation { get; private set; } = default!; public List<ProductQuantity> Products { get; private set; } = new List<ProductQuantity>(); public OrderStatus Status { get; private set; } public OrderDetails(Guid id) : base(id) { } public void Apply(OrderCreated ev) { Delivery = ev.Delivery; PaymentInformation = ev.PaymentInformation; Products = ev.Products; Status = OrderStatus.Created; ApplyChange(ev); } public void Apply(OrderPaid ev) { Status = OrderStatus.Paid; ApplyChange(ev); } public void Apply(OrderDispatched ev) { Status = OrderStatus.Dispatched; ApplyChange(ev); } public void Apply(OrderCompleted ev) { Status = OrderStatus.Completed; ApplyChange(ev); }}
4.3 - Creating Use Cases with Event Sourcing
To keep this post simple, I won’t specify all use cases, only the ones for creating an order and marking it as paid. All other cases follow the same logic.
- Note: In the code, all use cases are implemented.
The first thing is to change the endpoint
so instead of returning random data, we call the service we created and return real information:
[HttpPost("create")]public async Task<ActionResult<Guid>> CreateOrder(CreateOrderRequest createOrderRequest, CancellationToken cancellationToken = default(CancellationToken)){ OrderDto orderDto = new OrderDto(Guid.NewGuid(), createOrder.orderAddress, createOrder.PersonalDetails, createOrder.Products); await _domainMessagePublisher.Publish(orderDto, routingKey: "order"); return new AcceptedResult($"getorderstatus/{orderDto.orderId}", orderDto.orderId); }[ApiController][Route("[controller]")]public class OrderController{ private readonly ICreateOrderService _createOrderService; [HttpPost("create")] public async Task<ActionResult<Guid>> CreateOrder(CreateOrderRequest createOrderRequest, CancellationToken cancellationToken = default(CancellationToken)) { Guid orderId = await _createOrderService.Execute(createOrderRequest, cancellationToken); return new AcceptedResult($"getorderstatus/{orderId}", orderId); }}
Now, let’s create our OrderService
. As seen in the endpoint, besides saving the data in the database, it also generates a domain message.
In the logic, we create a new aggregate via OrderDetails
, apply the change (here it’s OrderCreated
with the .Apply()
method), and then save it to the repository.
- Note: For now, leave the domain event publishing, but if you’re not using CQRS, you’ll probably switch to an integration message.
4.3.1 - Adding a New Event
Once we’ve created and saved the aggregate (we can check it in the DB), let’s create another event — this time to indicate it's paid.
For this we’ll create the OrderPaidService
use case. All we need to do is apply the OrderPaid
event and save:
public interface IOrderPaidService{ Task<bool> Execute(Guid orderId, CancellationToken cancellationToken = default(CancellationToken));}public class OrderPaidService : IOrderPaidService{ private readonly IOrderRepository _orderRepository; public OrderPaidService(IOrderRepository orderRepository) { _orderRepository = orderRepository; } public async Task<bool> Execute(Guid orderId, CancellationToken cancellationToken = default(CancellationToken)) { OrderDetails orderDetails = await _orderRepository.GetById(orderId, cancellationToken); orderDetails.Apply(new OrderPaid()); await _orderRepository.Save(orderDetails, cancellationToken); return true; }}
If we now check the database, we’ll see that we only have two events.
Also, these events only contain the info to change, not the entire object. In the case of OrderPaid, it contains no data — it’s merely the event which changes the status.
And if we read the order through GetOrder
, we see the info is correct:
5 - Event Sourcing vs Event Driven
Finally, don’t confuse event sourcing (which is about keeping a state or event history for data we have) with event-driven (which is about using events to communicate with other parts of the system — internally with domain events or externally with integration events).
Conclusion
In this post, we’ve seen what Event Sourcing is.
The difference between event sourcing and event driven.
How to apply event sourcing to our code.
How to implement Event Sourcing with .NET and MongoDb.
If there is any problem you can add a comment bellow or contact me in the website's contact form