Kafka Series Vol. 1: What is Kafka?

Fatih Ünlü
9 min readJan 13, 2025

--

Introduction to Kafka Event Streaming

In today’s digital era, data is one of the most valuable resources driving businesses and innovation. To keep up, we constantly develop faster and more reliable technologies for storing and transmitting data. While storing data is crucial, efficient data transmission is equally important.

At its simplest, data transmission means moving information from a source to a target. However, as the number of sources and targets grows, managing this flow becomes increasingly complex because of challenges like protocols, schemas, and data formats. This is where Apache Kafka, a powerful distributed event streaming platform, comes in. Kafka simplifies these complexities, making data streaming seamless, scalable, and reliable.

Data transferring without streaming platform

What is a distributed event streaming platform?

A distributed event streaming platform is a system that captures, stores and processes real-time data streams across multiple servers to ensure scalability, reliability, and fault tolerance.

Real-time means processing and delivering the data with minimal delay, enabling immediate actions. Tracking website events can be a good example of this.

Decoupling: Since the source and target are allowed to operate independently and have no direct dependencies between them, it makes systems more flexible, resilient, and scalable.

Fault Tolerance: Kafka can have multiple brokers and replicate the data across them to prevent data loss. This feature makes it highly available so that we can continue our operations.

Scalability: Since source and target are allowed to operate independently and have no direct dependencies, it makes systems more flexible, and scalable.

Even though Kafka is one of the most well-known event data streaming platforms, Amazon Kinesis, RabbitMQ, Pulsar, and Pravega are also widely used alternatives, each offering unique features and capabilities tailored to different use cases.

Brief history

Kafka was developed at LinkedIn to handle large amounts of data in real-time, like user actions and system logs in 2010. It was built to be fast, reliable, and scalable, becoming an important tool for LinkedIn.
A year later, it was released as open-source under the Apache Software Foundation, where it became popular across many industries. Today it is a very popular platform with a large community.

Let’s take a look at it deeply.

Topics

Kafka topics are the organized messages. Each topic has a unique name across the entire Kafka cluster.

  • Kafka topics are somewhat like databases but lack data verification features,
  • Topics can have multiple partitions, enabling Kafka to scale horizontally and allowing for efficient parallel data processing,
  • Kafka accepts messages in any format (e.g., JSON, Avro, plain text),
  • Once data is read from a topic, it cannot be deleted because offsets are immutable,
  • By default, Kafka retains data for one week, but this retention period can be configured (e.g., retention.ms=86400000 for 1 day).

Partitions are a smaller, ordered subset of a topic that helps distribute and store messages across brokers, allowing for parallel processing and scalability.

It is good to know ordering is guaranteed within a partition but not across partitions. So partition-0 offset 8 is newer than partition-0 offset 7. However, we cannot determine whether partition-0 offset 7 or partition-2 offset 7 is the newest, as offsets are only comparable within the same partition.

Brokers

A broker is a Kafka server responsible for storing topic partitions and managing data exchange between producers and consumers.

  • It acts like a server in the Kafka cluster.
  • Each broker is uniquely identified by its broker ID.
  • Each broker holds a subset of topic partitions.
  • Data and partitions are distributed across all brokers, enabling horizontal scaling.
  • By adding more brokers and partitions, Kafka can handle larger volumes of data and achieve greater scalability.
Brokers

Producers

  • Producers write data to Kafka topics.
  • Producers know which partition to write to and the Kafka broker that holds it.
  • If a Kafka broker fails, producers automatically recover and continue sending data.
  • Producers can optionally send a key with each message:
    => If the key is null, data is sent in a round-robin manner across partitions, providing load balancing.
    => If the key is not null, data always goes to the same partition, determined by hashing the key.
  • Kafka only accepts and outputs bytes,
  • To send objects, they must first be serialized (converted to bytes).

Consumers

  • Consumers automatically know which broker and partition to read data from.
  • If a Kafka broker fails, consumers can recover and continue reading data without losing progress.
  • Data is read in order within each partition, starting from the lowest offset to the highest as default but can be changed.
  • Consumers must deserialize (transform) the data from bytes into a specific object, to process it.

Note!
Serialization/Deserialization types should remain consistent throughout a topic’s lifecycle.
If the format needs to change, it’s better to create a new topic instead of modifying the existing one.

Consumer groups are a crucial aspect of Kafka consumers. I’ll provide a detailed explanation with a good example in my next post.

An Example

Let’s make a very simple example by creating a solution that has 2 console projects in it(.Net 8), one for the producer part, and the other one for the consumer part.

To run Kafka on your machine, we can use a docker-compose.yml file like this:

version: '3'

networks:
app-tier:
driver: bridge

services:
kafka:
image: 'bitnami/kafka:3.7' # can be => 'bitnami/kafka:latest' as well
ports:
- '9094:9094'
networks:
- app-tier
volumes:
- 'data:/bitnami'
environment:
# Kafka Settings
KAFKA_CFG_NODE_ID: 0
KAFKA_CFG_PROCESS_ROLES: controller,broker
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 0@kafka:9093
# Listeners
KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,EXTERNAL://localhost:9094
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
volumes:
data:
driver: local

Basically this docker-compose.yml file creates one Kafka v3.7 container with these env variables:

KAFKA_CFG_NODE_ID: With this configuration, we assign a unique ID to the broker. While the specific ID value is not critical for this example, it’s important to note that in scenarios with multiple Kafka brokers, each broker must be assigned a unique ID to ensure proper identification and functionality.

KAFKA_CFG_PROCESS_ROLES: Kafka nodes can have multiple roles, A Kafka node can be a controller and broker same time.

  • Controllers: Manages the cluster, for example choosing the leader.
  • Broker: Serving producers/consumers and handling data storage

KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: With this config, we define which nodes can vote during controller election.
<brokerId>@<address-where-controller-listens>

KAFKA_CFG_LISTENERS: With this config, we set up the network addresses of listeners for the Kafka broker.

  • PLAINTEXT://:9092: For regular producer/consumer communication.
  • CONTROLLER://:9093: For controller to controller communication.
  • EXTERNAL://:9094: For external clients outside the docker network, we added ports config to the docker-compose.yml file. We will be using this port when we use a UI for Kafka.

KAFKA_CFG_ADVERTISED_LISTENERS: This config advertises the addresses clients should use. We used kafka:9092 internal docker clients and we used localhosst:9094 for outside Docker.

Note!

Plaintext means is a security protocol, which is un-authenticated, non-encrypted channel.
Security protocol Docs

KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: It maps each listener to a security protocol.

KAFKA_CFG_CONTROLLER_LISTENER_NAMES: Specifies which listener to use for controller communication.

You can get more info if you want to change the docker-compose.yml file from this link.

Let’s create the container,

 docker-compose up -d 

and let’s add Confluent.Kafka package to both of the projects.

dotnet add package Confluent.Kafka --version 2.8.0

Once we run the docker container we should be able to create topics using Kafka cli, but instead, we’ll write and use C# function to create topics.

public class KafkaService
{
public async Task CreateTopic(string topicName)
{
using var client = new AdminClientBuilder(new AdminClientConfig()
{
BootstrapServers = "localhost:9094",
}).Build();

try
{
await client.CreateTopicsAsync([
new TopicSpecification(){ Name = topicName }
]);

Console.WriteLine($"{topicName} is successfully created \u2705 ");
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
}
}

So, what we are doing here is basically we are creating a Kafka client object by giving the address(which is localhost:9094 here).
Then we call CreateTopicsAsync function to create a topic in a specific Kafka broker. If there is any topic with the given name, then it will throw an error there. We can set some configs while we are creating topics but we’ll skip them for now.

Let’s create an instance of KafkaService we created, and then let’s call create topic two times:

var topicName = "topic-" + DateTime.Now.ToString("yyyyMMdd");
var kafkaService = new KafkaService();
await kafkaService.CreateTopic(topicName);
await kafkaService.CreateTopic(topicName);

We should see these logs if we have no connection issues:

So we created our first topic successfully, let’s send a message to this topic. (There are several configurations we will discuss later, like choosing a key for partitioning, serialization settings, etc.)

Let’s add this SendMessage function to send a message to our Kafka topic:

    public async Task SendMessage(string topicName, string key, string value)
{
var config = new ProducerConfig
{
BootstrapServers = "localhost:9094"
};

using var producer = new ProducerBuilder<string, string>(config).Build();

try
{
var result = await producer.ProduceAsync(topicName, new Message<string, string>
{
Key = key,
Value = value
});

Console.WriteLine($"\ud83d\udce4 Message delivered to {result.TopicPartitionOffset}");
}
catch (ProduceException<string, string> ex)
{
Console.WriteLine($"Failed to deliver message: {ex.Error.Reason}");
}
}

It is pretty similar to the one we have for creating the topic. ProducerBuilder part is important for creating producer settings.

        using var producer = new ProducerBuilder<string, string>(config).Build();

config part of it is for the one for connection details we created,
<string, string> means the key and value parts are string. Later we are going to change this bit, to send an object.
The Build() part is for creating a producer instance. And we use using here to prevent resource leaking.

Let’s update the program.cs with this code to create a topic and send a message:

var topicName = "topic-" + DateTime.Now.ToString("yyyyMMdd");
var kafkaService = new KafkaService();
await kafkaService.CreateTopic(topicName);
await kafkaService.SendMessage(topicName, "key-hello", "value-world");

Once we run this code, we should see this output:
(if you create the same topic already you should see a warning instead of creating it. However, it should send the message. )

Okay so we sent our simple message to the topic, let’s consume it. To do so, create KafkaService class and add this:

using Confluent.Kafka;

namespace KafkaDemo.Consumer;

public class KafkaService
{
public async Task ConsumeMessages(string topicName, CancellationToken cancellationToken)
{
var config = new ConsumerConfig
{
BootstrapServers = "localhost:9094",
GroupId = "consumer-group-one",
AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = true
};

using var consumer = new ConsumerBuilder<string, string>(config).Build();

try
{
consumer.Subscribe(topicName);

Console.WriteLine($"Subscribed to topic 🗂 {topicName}");
while (!cancellationToken.IsCancellationRequested)
{
try
{
var consumeResult = consumer.Consume(cancellationToken);
Console.WriteLine($"📥 Consumed message: {consumeResult.Message.Key} - {consumeResult.Message.Value} " +
$"from topic {consumeResult.Topic}, partition {consumeResult.Partition}, offset {consumeResult.Offset}");
}
catch (ConsumeException ex)
{
Console.WriteLine($"❌ Error consuming message: {ex.Error.Reason}");
}
}
}
catch (OperationCanceledException)
{
Console.WriteLine("Consumption was canceled.");
}
finally
{
consumer.Close();
Console.WriteLine("Consumer closed.");
}
}
}

and program.cs should be like this:

using KafkaDemo.Consumer;

Console.WriteLine("Consumer started!");

var cts = new CancellationTokenSource();
try
{
var kafkaService = new KafkaService();
var topicName = "topic-" + DateTime.Now.ToString("yyyyMMdd");
await kafkaService.ConsumeMessages(topicName, cts.Token);
}
catch (Exception ex)
{
Console.WriteLine($"An error occurred: {ex.Message}");
}
finally
{
cts.Cancel();
}

The result:

So we consumed topic messages here successfully, and the consumer app keeps listening if there are any new messages on the topic. We can re-run the producer to send more messages and can see them in the consumer app too.

Here I’m sharing the GitHub repo link so you can try it out. I’ve also added a few extra helpful functions, like RemoveTopic, ListTopics, and ListTopicPartitions.

Thank you so much for checking out my project! 😊 I’ll keep adding more features and sharing updates to make it even more useful. Stay tuned for more posts packed with examples and detailed explanations — there’s a lot more to come! 🚀

References

--

--

Responses (4)