Kafka Series Vol. 2: From Serialization to Consumer Groups

Fatih Ünlü
9 min readJan 27, 2025

--

In my first post, Kafka Series Vol. 1: What is Kafka?”, I introduced the fundamentals of Apache Kafka, including its core components like topics, partitions, brokers, producers, and consumers. I also provided a step-by-step guide on creating a Kafka instance using Docker.

Let’s continue where we left off. In the last example, we sent data to a topic from producers and consumed it from consumers. The data we sent was simply a string.

using var producer = new ProducerBuilder<string, string>(config).Build();

What if we want to send an object instead of a string?

Serialization & Deserialization

Whether we use an object-oriented language or a functional one like Go, the data we want to send can often be more complex than a simple string — such as an object or a struct. However, it’s important to understand that Kafka stores all data as byte arrays, meaning it expects the data we send to be in this format. While the format of our data (e.g., JSON, Avro, etc.) can vary, it must ultimately be serialized into a byte array.

In the previous example, we didn’t need to convert a string to a byte array explicitly. Why? Because the Confluent Kafka library automatically handles this conversion for string values. However, when dealing with objects, we need to serialize them explicitly. In my examples, I’ll be converting the objects into JSON format before sending them to Kafka.

On the consumer side, consumers will deserialize the data, converting it from a byte array back to a string and then into the desired object format. This serialization/deserialization process is critical to how Kafka enables communication between producers and consumers with different data formats.

What is a Replica of Partitions?

In my previous post, I shared the diagram below to illustrate how brokers in Kafka store topic partitions. Each broker is responsible for hosting multiple partitions, which enables distributed data storage across the cluster.

Kafka Partition Distribution Across Brokers

Now, let’s take this concept further and discuss partition replication. In Apache Kafka, each partition is replicated across multiple brokers to ensure fault tolerance and high availability. For example, in the diagram above, we see partitions of different topics distributed across Broker 1, Broker 2, and Broker 3. However, what if Broker 1 goes down? This is where the replica of partitions becomes critical.

Each partition has one leader replica and follower replicas, spread across different brokers. The leader handles all read and write requests, while the followers sync data from the leader. If the broker hosting the leader fails, one of the followers is promoted to be the new leader, ensuring data remains accessible.

Partition replication is determined by the replication factor. For instance, with a replication factor of 3, each partition will have three copies: one leader and two followers. This approach guarantees data durability and ensures Kafka remains operational even in the face of broker failures.

By understanding the relationship between brokers, partitions, and replicas, we can see how Kafka achieves its robust and fault-tolerant architecture.

ISR (In-Sync Replicas) in Kafka, is the group of replicas that are fully up-to-date with the leader replica of a partition. These replicas have the same data as the leader and can take over as the new leader if the current leader fails.

ACK in Kafka

ACK(Acknowledgment), is how brokers confirm that they have received and stored a message successfully. This helps ensure the messages are reliably delivered between producers and brokers.

When a producer sends a message, it can ask the broker for confirmation that the message was written to a partition. The type of acknowledgment depends on the ACK settings, which balance reliability and performance.

There are 3 types of ACKs in Kafka:

acks=0 (No Ack):
The producer doesn’t wait for any acknowledgment from the broker, messages are sent to the broker, but there’s no guarantee they were successfully received or stored. High-speed, low-reliability scenarios are good examples of this.

acks=1 (Leader Ack):
The producer waits for an acknowledgment only from the leader replica of the partition. If the leader successfully writes the message, it sends an acknowledgment to the producer. A middle ground between performance and reliability. Suitable for most applications where minimal data loss is acceptable.

acks=all (All Replicas Acknowledgement):
The producer waits for acknowledgment from all replicas of the partition. This guarantees that the message is stored on every replica before confirming success. Critical applications where data loss is unacceptable, such as financial transactions are good use cases to use this type.

Confluent.Kafka Acks Enum:

namespace Confluent.Kafka
{
/// <summary>Acks enum values</summary>
public enum Acks
{
/// <summary>All</summary>
All = -1, // 0xFFFFFFFF
/// <summary>None</summary>
None = 0,
/// <summary>Leader</summary>
Leader = 1,
}
}

Example usage:

var config = new ProducerConfig
{
BootstrapServers = "localhost:9094",
Acks = Acks.All
};

Shipping Notifications in Action

Imagine we have a scenario where shipping notifications need to be continuously sent to a Kafka topic. Each notification represents an order that has been shipped, containing key details such as the order code, shipment date, carrier name, and tracking number.

In this example, we simulate this scenario by generating real-time shipping notifications and sending them to the Kafka topic in a controlled stream. Each notification is unique, reflecting a real-world event like an e-com system tracking shipments. A 1-second delay is added for testing purposes, allowing better observation of the message flow to the topic.

The way we create the topic:

public async Task CreateTopicAsync(string topicName, int numPartitions = 2, short replicationFactor = 1)
{
using var client = new AdminClientBuilder(new AdminClientConfig
{
BootstrapServers = "localhost:9094",
}).Build();

try
{
var topicSpecification = new TopicSpecification
{
Name = topicName,
NumPartitions = numPartitions,
ReplicationFactor = replicationFactor
};

await client.CreateTopicsAsync([topicSpecification]);
Console.WriteLine($"Topic '{topicName}' created successfully. \u2728 ");
}
catch (CreateTopicsException ex)
{
Console.WriteLine(ex.Message);
}
}

CreateTopicAsync, is responsible for creating a Kafka topic with a specified topic name, number of partitions, and replication factor. It uses the AdminClient to communicate with the Kafka cluster, defining the topic configuration in a TopicSpecification object. The NumPartitions parameter determines how the data will be split across partitions, and ReplicationFactor specifies how many copies of the data will be stored across brokers for fault tolerance.
The first example I’m going to show will be a topic that has 2 partitions.

var kafkaService = new KafkaService();

var topicName = "order-shipped-notifications";
await kafkaService.CreateTopicAsync(topicName);

while (true)
{
await kafkaService.SendMessageAsync(
topicName,
new OrderShippedNotification
{
OrderCode = Guid.NewGuid().ToString(),
ShippedDate = DateTime.UtcNow,
CarrierName = "random-carrier",
TrackingNumber = $"TRK-{Guid.NewGuid().ToString("N").Substring(0, 10)}"
}
);
await Task.Delay(1000);
}

Note: This is just a snippet of the code to demonstrate the main logic for sending shipping notifications. For the full implementation and additional details, you can check out the complete code in my repository. The GitHub link will be shared at the bottom of this post.

public async Task SendMessageAsync(string topicName, OrderShippedNotification notificationEvent)
{
var config = new ProducerConfig
{
BootstrapServers = "localhost:9094",
};

using var producer = new ProducerBuilder<Null, string>(config).Build();
try
{
var value = JsonSerializer.Serialize(notificationEvent);
var result = await producer.ProduceAsync(topicName, new Message<Null, string>
{
Value = value
});

Console.WriteLine($"The message OrderCode:{notificationEvent.OrderCode} sent to Partition:{result.Partition} and Offset:{result.Offset} 🚀");
}
catch (ProduceException<string, string> ex)
{
Console.WriteLine($"Failed to deliver message: {ex.Error.Reason}");
}
}

This SendMessageAsync function sends a serialized OrderShippedNotification to a specified Kafka topic. It creates a Kafka producer, serializes the notification to JSON, and sends it usingProduceAsync, logging the partition and offset upon success. Errors during message delivery are caught and logged with detailed reasons.

Let’s have a look at the consumer part:

public async Task ConsumeMessages(string topicName, string groupId, CancellationToken cancellationToken)
{
var config = new ConsumerConfig
{
BootstrapServers = "localhost:9094",
GroupId = groupId,
AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = true
};
using var consumer = new ConsumerBuilder<string, string>(config).Build();

try
{
consumer.Subscribe(topicName);
Console.WriteLine($"Subscribed to topic 🗂 {topicName}");
while (!cancellationToken.IsCancellationRequested)
{
try
{
var consumeResult = consumer.Consume(cancellationToken);
var notificationEvent = JsonSerializer.Deserialize<OrderShippedNotification>(consumeResult.Message.Value);
if (notificationEvent != null)
{
Console.WriteLine($"📥 Consumed message: OrderCode: {notificationEvent.OrderCode}, " +
$"Carrier Name: {notificationEvent.CarrierName}, : TrackingNumber: {notificationEvent.TrackingNumber}");
}
}
catch (ConsumeException ex)
{
Console.WriteLine($"❌ Error consuming message: {ex.Error.Reason}");
}
}
}
catch (OperationCanceledException)
{
Console.WriteLine("Consumption was canceled.");
}
finally
{
consumer.Close();
Console.WriteLine("Consumer closed.");
}
}

The consumer function, ConsumeMessages, listens to a specified Kafka topic, and processes incoming messages for a given consumer group. It deserializes each message's value into an OrderShippedNotification object and logs the order details, such as OrderCode, CarrierName, and TrackingNumber. The method continuously consumes messages unless canceled via CancellationToken, ensuring proper error handling for consumption failures.

Let’s run one producer app to create a topic(with 2 partitions) and send messages to a Kafka topic every second, and run two consumers to listen to this topic.

Kafka Demo: 2 Partitions, 1 Producer, 2 Consumers (Same consumer group)

Now, let me ask you this: What do you think will happen if we increase the number of consumer apps for this scenario? For example, if we have more than 2 consumer apps running, will that help share the workload? Will the new consumers read from one of the existing partitions?

The answer is no, it wouldn’t help. In Kafka, the number of consumers in the same consumer group cannot exceed the number of partitions because each partition can only be assigned to one consumer within the same group. Any additional consumers would remain idle since there wouldn’t be any partitions left to assign them to.

This is why it is important to manage the number of consumer apps carefully.

Kafka Demo: 2 Partitions, 1 Producer, 3 Consumers (Same consumer group)

What are Consumer Groups?

A consumer group is a collection of consumers working together to consume messages from a topic. Each consumer in the group is assigned a subset of the topic’s partitions, ensuring that only one consumer processes each message within the group.

Partition Assignments Across Consumer Groups

Let’s have a look at this diagram. The topic contains two partitions, Partition 0 and Partition 1. There are two consumer groups, Consumer Group 1 (has 2 consumers) and Consumer Group 2(has one consumer).
Each partition in a topic is assigned to one consumer per group.

For CG-1, partition 0 is assigned to Consumer 1, and Partition 1 is assigned to Consumer 2. However, this could be reversed as well, as Kafka dynamically decides the partition assignment based on the current state of the consumers in the group.

For CG-2, since there’s only one consumer, both Partition 0 and Partition 1 are assigned to Consumer 1 in this group.

I made a few changes to the consumer app to allow the consumer group name to be passed as a parameter. This demonstrates an example of using different consumer groups.

Kafka Demo: 2 Partitions, 2 Consumer Groups-2 Consumers

How do Consumer Groups Help Us?

Essentially, Consumer groups allow us to replicate the processing of topic data across multiple independent groups, each handling the data for its specific purpose.

In our example, we’re writing a shipping notification to a Kafka topic and consuming it, right? Now imagine we want to send this notification through two separate channels — one for mobile push notifications, and another one for email notifications. Having consumers in different consumer groups enables us to process the same data independently for each channel. That’s why this feature is incredibly useful for handling multiple processing needs simultaneously.

Here I’m sharing the 👉🏻 GitHub repo link so you can try it out.
To see all the examples, you can directly check the master branch.

Thank you for taking the time to read this post! 😊 I’ll continue exploring more Kafka concepts and sharing practical examples to help you on your journey. Stay tuned for the next part of the series — there’s so much more to uncover! 🚀

References

--

--

Responses (1)