Hello, hope you are doing great!
Thank you for subscribing to this newsletter.
This is the first in a series of posts I'll be writing to build a distributed message broker like Kafka from the bare bones in Go. In the posts, along with the code, you'll find discussions on the applicable system architecture concepts to help you get a comprehensive understanding of how a distributed system like Kafka and other log-based systems function and scale.
The posts collectively will form a crash course on coding distributed systems from scratch.
With this being said, let's jump right in.
Kafka is a distributed commit log
What does this mean?
A log-based structure is central to distributed messaging and event streaming systems like Kafka, AWS Kinesis, Apache Flink, RedPanda, etc.
A commit log, also often called an append-only log, maintains a sequence of records of events happening in the system or the changes in the system's state in an immutable append-only fashion. There is no modification or deletion of any event or data in the log sequence.
This structure ensures durability and consistency in a distributed system and has several use cases.
Kafka appends the messages sent to the topics in a commit log, which is an immutable sequence of events consumed by the subscribers.
Commit/append-only log data structure use cases
Imagine a stock trading service that pushes the stock events to a message broker to be consumed by the downstream services. Maintaining the order of transactions (events) is crucial for the business to avoid any financial inconsistencies, for smooth transaction reversals, to maintain a record of state changes (aka event sourcing) for auditing and dispute management, and such.
A commit log fits best in this use case in contrast to any other data structure, for instance, a queue, which message brokers typically leverage. (More in-depth discussion on the architectures of queue-based and log-based systems coming up in the forthcoming posts).
A distributed commit log maintains the durability of the transactions as they can be persisted to the disk based on the time duration or the size of the log data. This log data based on topics and partitions can be spread across a cluster of nodes increasing the throughput, in addition to making the service scalable and available.
In case a node fails, the data is not lost as it is replicated to other nodes with the help of another use case of log, which is replicated log. The replicated log ensures all the nodes in the cluster have identical copies of data, keeping a consistent cluster state.
Furthermore, since the data structure is append-only, the transactions or events can be replayed to understand a certain flow of system events without any fuss.
Other uses of append-only commit logs
Kafka isn't the first implementation of the append-only log. Databases use commit logs called write-ahead logs to store a log of data changes before they are applied to the actual data.
In the event of a failure, DBs rely on these logs to recover and maintain data consistency. These logs are a key enabler in achieving ACID in databases.
Consensus protocols like Raft maintain a replicated log of changes to maintain cluster consistency. The leader appends the events to the log and then the log is replicated to all the follower nodes in the cluster.
Now that we have an understanding of the central data structure of our message broker let's create a cluster node to store a log.
type Node struct {
ID string
Status string
CommitLog []*storage.Log
}
Before I delve into the code explanation, let's understand what a node is. What does it mean, really?
Cluster node
A node represents a single instance in a cluster or a distributed system. We can see it as a single unit of compute or storage and it can be implemented/deployed in different forms, depending on the context.
When running a message broker cluster comprised of multiple nodes in our local machine, where each node is responsible for storing logs and interacting with the producer and the consumer, a single node means a single isolated process running in the machine.
Every node in the cluster runs as an isolated process on separate ports on our local machines. All these cluster nodes have a process-level isolation simulating a distributed cluster.
In case you wish to understand ports, I've discussed ports, sockets and IP addresses in another post where I've implemented a single-threaded TCP/IP server in Java. Do check it out.
If we run all these nodes as separate containers in Docker, they will now have container-level isolation on our local machine. Every container will have its own broker code, dependencies and environment.
This is more like a production-like environment where all the nodes are isolated and running independently.
Now, if we deploy these container instances of nodes on different virtual machines on AWS, Azure or GCP, they will have VM-level isolation. Or if we deploy these nodes on bare-metal servers, they will have physical server isolation.
In this scenario, we can say one bare metal server is one node of our distributed message broker. So, what a node is in a distributed system totally depends on the context. At the same time, we can see it as a single unit of compute or storage.
Let's get back to our node code.
Creating a node for our message broker
type Node struct {
ID string
Status string
CommitLog []*storage.Log
}
The above Go struct represents our message broker node. It contains three fields: ID, Status and a CommitLog, which is an append-only list of Logs.
ID is a unique identifier for the node. String type for ID allows for a flexible ID that may include a UUID or any other custom identifier. Status indicates the current status of the node (e.g., idle, active, down, terminated, etc.) and CommitLog is an append-only list of events or messages (contained within a Log struct) that the node receives from the event producer.
Log
in CommitLog []*storage.Log
is a struct that represents an entry in a commit log and storage is the package name where log.go the file containing the Log struct code is housed.
Here is the complete Node.go code:
type Node struct {
ID string
Status string
CommitLog []*storage.Log
}
func NewNode(id string) *Node {
return &Node{
ID: id,
Status: "active",
CommitLog: []*storage.Log{},
}
}
func (n *Node) PrintStatus() {
logrus.Infof("Node ID: %s, Status: %s", n.ID, n.Status)
}
func (n *Node) AddLog(log *storage.Log) {
n.CommitLog = append(n.CommitLog, log)
logrus.Infof("Log added to Node ID %s: %s\n", n.ID, log.Message)
}
Event log
type Log struct {
Timestamp time.Time
Message string
}
The Log struct, which represents an individual log entry in the commit log, has two fields: Timestamp, which represents the time when the event was logged, and Message, which is the event payload; in other words, the actual data being logged.
For logging the time, Go's time package is leveraged to provide precise date and time information, and for simplicity, the payload is accepted as a string data type. In the forthcoming posts, as we add more features to our message broker, we can create a complex data type to store the event payload as opposed to a string.
Here is the complete Log.go code
type Log struct {
Timestamp time.Time
Message string
}
func NewLog(message string) *Log {
return &Log{
Timestamp: time.Now(),
Message: message,
}
}
func (l *Log) PrintLog() {
logrus.Infof("[%s] Log Message: %s", l.Timestamp.Format(time.RFC3339), l.Message)
}
The Log.go file has a constructor function that takes the event payload as a string and returns a new Log instance.
func NewLog(message string) *Log {
return &Log{
Timestamp: time.Now(),
Message: message,
}
}
We have another method PrintLog(), defined on the Log struct in the same file.
func (l *Log) PrintLog() {
logrus.Infof("[%s] Log Message: %s", l.Timestamp.Format(time.RFC3339), l.Message)
}
This method logs the details of the Log using the Logrus logging library.
Logging helps us keep track of events that happen in a distributed system comprising multiple nodes. It helps with debugging and understanding the flow of events.
We have similar constructor functions and methods in the Node.go file that create a new node, taking the node ID as an argument and log the Node status.
func NewNode(id string) *Node {
return &Node{
ID: id,
Status: "active",
CommitLog: []*storage.Log{},
}
}
func (n *Node) PrintStatus() {
logrus.Infof("Node ID: %s, Status: %s", n.ID, n.Status)
}
We have another method, AddLog, which adds a log entry to the Node.
func (n *Node) AddLog(log *storage.Log) {
n.CommitLog = append(n.CommitLog, log)
logrus.Infof("Log added to Node ID %s: %s\n", n.ID, log.Message)
}
Running the code
In main.go, in the main function, we create a new message broker node, print its status, then create a new log entry manually and append it to the CommitLog.
func main() {
logrus.SetFormatter(&logrus.TextFormatter{FullTimestamp: true})
logrus.Info("Starting the message broker...")
nodeID := "node-1"
brokerNode := broker.NewNode(nodeID)
brokerNode.PrintStatus()
logEntry := storage.NewLog("Hello, world! This is the first log message.")
}
broker and storage are the package names where the files Node.go and Log.go containing the NewNode constructor function and NewLog method are located, in case you are wondering where those come from.
Here are the results in the terminal:
Summary
Let's summarize the whole thing: we created a Node struct that represents a message broker node. It has NewNode constructor that returns a new node instance and then it has AddLog and PrintStatus methods.
We then created a Log struct, which represents an individual entry in the commit log. This has NewLog and PrintLog methods, creating a new Log instance and printing the Log.
In the main function, I created a new node via the NewNode constructor function with "node-1" as the node ID and then printed its status.
I then created a new Log instance with the help of the NewLog constructor function and added this Log instance to the CommitLog via the AddLog method.
In the forthcoming posts, we will extend our code by creating a producer API for our message broker to ingest logs from so that we don't manually have to create a log entry and append it to the commit log.
Also, we will create a consumer API to enable the consumers to consume logs. Subsequently, we will implement concepts such as message broker topics, partitions, and such, in addition to discussing the intricacies.
Now that I've coded this in Go does not mean you have to implement this in the same language as well. You can implement the message broker in the programming language of your choice, given I've explained the high-level flow of the system.
Furthermore, you can:
Practice coding distributed systems in the programming language of your choice
CodeCrafters is a platform that is designed to help developers learn by building distributed systems like Redis, Docker, Git, Kafka, etc., from the bare bones.
With their interactive, hands-on exercises, you'll not only deeply understand how these complex systems work but implement them step by step in the programming language of your choice and grow your engineering skills, becoming a deft software engineer.
Do check out their platform, if you enjoy the experience and decide to make a purchase, you can use my unique link to get 40% off.
The above link is an affiliate link. When you make a purchase through it, I get a small cut without you paying anything extra. Cheers!
Further reads on systems programming
I've implemented a single-threaded and multithreaded TCP/IP server in Java. You can go through the linked posts to understand how servers function.
You'll find more systems programming and systems architecture posts on this newsletter soon. If you haven't subscribed yet, I urge you to subscribe to have these posts slide into your inbox as soon as they are published.
If you found the post helpful, consider sharing it with your friends for more reach. You can find me on LinkedIn & X and can chat with me on Substack chat as well. I'll see you in the next post pretty soon. Until then, Cheers!