Kafka Series Vol. 3: Handling Follow Events in Social Media
In my previous post, “Kafka Series Vol. 2: From Serialization to Consumer Groups” I explained how to serialize/deserialize data, manage Acks in Kafka, and utilize consumer groups.
In this article, I will show you how to use Kafka UI and explore how to handle the frequency of follow actions on social media among users, ensuring updates are processed smoothly without degrading the user experience.
Proventus
Apache Kafka is a powerful tool for data streaming, but managing and monitoring it can be complex. That’s where Kafka UIs come in. In this article, we’ll take a look at Proventus, one of the popular Kafka UI, and also give an example of the usage of Kafka with Mongo.
UI for Apache Kafka is a versatile, fast, and lightweight web UI for managing Apache Kafka® clusters. Built by developers, for developers.
Proventus is a tool designed to help you easily manage and monitor your Apache Kafka clusters. It provides a straightforward dashboard where you can see important information about your Kafka operations.
Proventus is a free open-source web UI. Here is the GitHub link.
Basically what we can do with Proventus:
- Monitor in real-time: we can keep an eye on how well our Kafka is performing with live updates.
- Managing Clusters: We can adjust settings and manage our data flow without needing deep tech knowledge.
- Stay Secure: We can handle security settings and keep track of compliance with built-in features.
In our previous example, we had a Docker compose file that ran a Kafka instance, let’s keep that part the same as is, but add kafka-ui to monitor Kafka clusters.
version: '3'
networks:
app-tier:
driver: bridge
services:
kafka:
image: 'bitnami/kafka:3.7' # You can use 'bitnami/kafka:latest' as well
ports:
- '9094:9094'
networks:
- app-tier
volumes:
- 'data:/bitnami'
environment:
KAFKA_CFG_NODE_ID: 0
KAFKA_CFG_PROCESS_ROLES: controller,broker
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 0@kafka:9093
KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,EXTERNAL://localhost:9094
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
kafka-ui:
image: 'provectuslabs/kafka-ui:latest'
ports:
- '8080:8080'
networks:
- app-tier
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
volumes:
data:
driver: local
KAFKA_CLUSTERS_0_NAME
: We set this to “local”, which is the name assigned to our Kafka cluster within UI.
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS
: This points to kafka:9092, which is the internal network address and port used by Kafka, ensuring that Kafka UI can communicate with the Kafka broker.
So if we run this docker-compose file successfully, we should be able to access Kafka UI from http://localhost:8080.
So here we see a Broker Count of 1 because we configured it this way in our docker-compose.yml file. However, we can always increase this number if needed.
Now, let’s create a topic, and send a message to this topic four times, I just used the example we did on our first post, here is the link.
So we see topic-20250207
created there, we can see it has 1 partition and 1 replication factor.
URP(Under Replicated Partitions):
Indicates whether all replicas of all ledgers are synchronized. Here it shows 0, which means there are no under-replicated partitions, and all data is fully replicated.
In Syncs Replica: This specifies the count of replicas currently in sync, shown here as 1 of 1.
Message Count: This is the total number of messages stored in this partition, which is 4, shows 4 messages sent to this topic.Clean Up Policy:
Displays the policy for cleaning up old data; here, it is set to ‘DELETE’, which means data will be deleted according to retention settings.
Okay, let’s run the consumer app to consume all the messages and check what changes in the UI.
__consumer_offsets
: This bit is an internal system topic that Kafka uses to keep track of where each consumer group is in reading the messages of other topics.
An Example: Simplifying Social Media Interactions with Kafka and MongoDB
In the dynamic world of social media platforms, efficiently managing user interactions like “follow” events can be challenging. These events occur frequently and at scale, requiring a system that can handle high throughput and provide immediate feedback to users.
Let’s imagine we have a social media platform where users frequently follow each other to stay updated with one another’s activities. Every time a user decides to follow someone, not only does the follower count need updating, but the system also needs to ensure that the new follower receives updates about the followee’s activities. This requires a robust and scalable system capable of handling these changes efficiently, without impacting the user experience negatively.
- Event Generation: When a user follows another user, the event is produced to a Kafka topic named
user-follows
. - Processing with Kafka: Kafka consumes the event and processes it asynchronously.
- Updating MongoDB: Once an event is consumed from Kafka, the corresponding user document in MongoDB is updated. (If the user doesn’t exist, it is created with the new follower added to the
followers
list. ) - Immediate Feedback via Kafka UI: Developers and administrators can monitor Kafka topics and the flow of events through Kafka UI.
Let’s update the docker-compose.yml file first to have MongoDB:
version: '3'
networks:
app-tier:
driver: bridge
services:
kafka:
image: 'bitnami/kafka:3.7'
ports:
- '9094:9094'
networks:
- app-tier
volumes:
- 'data:/bitnami'
environment:
KAFKA_CFG_NODE_ID: 0
KAFKA_CFG_PROCESS_ROLES: controller,broker
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 0@kafka:9093
KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,EXTERNAL://localhost:9094
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
kafka-ui:
image: 'provectuslabs/kafka-ui:latest'
ports:
- '8080:8080'
networks:
- app-tier
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
mongo_db:
image: 'mongo:latest'
ports:
- '27017:27017'
networks:
- app-tier
volumes:
- 'mongo-data:/data/db'
environment:
MONGO_INITDB_ROOT_USERNAME: root
MONGO_INITDB_ROOT_PASSWORD: example
volumes:
data:
driver: local
mongo-data:
driver: local
Producer bit will continuously simulate user-follow events in a social media application by sending messages to a Kafka topic in an infinite loop. Each message contains randomized follower and followee IDs, which are sent every second to mimic real user interactions. This automated process helps in testing the system’s response to continuous data flow, ensuring it can handle frequent updates without degrading performance.
while (true)
{
await kafkaService.SendMessageAsync(
topicName,
new FollowerEvent
{
FollowerId = rand.Next(1, 10),
FolloweeId = rand.Next(20, 30),
}
);
await Task.Delay(1000);
}
In Consumer, The ProcessMessage
function deserializes the incoming JSON message into a FollowerEvent
object.
private void ProcessMessage(string message)
{
try
{
var followerEvent = JsonSerializer.Deserialize<FollowerEvent>(message);
if (followerEvent == null) return;
_userService.AddFollower(followerEvent.FolloweeId, followerEvent.FollowerId);
Console.WriteLine($"📥 Consumed message: OrderCode: {message}");
}
catch (Exception e)
{
Console.WriteLine(e);
throw;
}
}
The consumer part invokes the AddFollower
method of the UserService
to update or create follower data in the database, effectively adding a follower to the specified user's follower list. This method uses MongoDB's upsert functionality to either update an existing user or create a new one if the user does not exist.
public void AddFollower(int userId, int followerId)
{
var filter = Builders<User>.Filter.Eq(u => u.Id, userId);
var update = Builders<User>.Update.Push(u => u.Followers, new Follower {
FollowerId = followerId,
FollowedAt = DateTime.UtcNow
});
var options = new UpdateOptions { IsUpsert = true };
var result = _usersCollection.UpdateOne(filter, update, options);
if (result.UpsertedId != null)
{
Console.WriteLine($"A new user was created with the ID: {result.UpsertedId}");
}
else if (result.ModifiedCount == 0)
{
Console.WriteLine("No user was updated. Check if the user ID is correct.");
}
else
{
Console.WriteLine("User updated successfully.");
}
}
Now let’s run docker-compose.yml and then run the producer and the consumer app. (I stopped the producer app after it sent 25 events.)
Here is the result,
So we can see that 25 events were sent to the partitions: partition-0 received 17, and partition-1 received 8. If we go to the messages tab, we can see the messages that have been sent to the topic:
Finally, let’s take a look at our MongoDB. I will be using Mongo Compass UI for this, but you can explore other alternatives as well. If we look at the database, we can see the number of followers for each user, stored as an array within the user’s document.
Try It Yourself
Here I’m sharing the 👉🏻 GitHub repo link so you can try it out.
To see all the examples, you can directly check the master branch.
Thank you!
Thank you so much for checking out my project! 😊 I’ll continue adding new features and improvements to make it even more valuable. Stay tuned for more posts packed with examples and detailed explanations — there’s a lot more to come! 🚀