Last week I attended to a Kafka workshop and this is my attempt to show you a simple Step by step: Kafka Pub/Sub with Docker and .Net Core tutorial.
Let’s start:
Create a folder for your new project#
Open a command prompt an run
mkdir kafka.pubsub.console
cd kafka.pubsub.console
Create a console project#
dotnet new console
Add the Confluent.Kafka nuget package#
Add the Confluent.Kafka nuget package to your project:
dotnet add package Confluent.Kafka -v 0.9.5
dotnet restore
Replace the contents of Program.cs#
Replace the contents of the Program.cs file with the following code:
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using Confluent.Kafka;
using Confluent.Kafka.Serialization;
namespace kafka.pubsub.console
{
class Program
{
static void Main(string[] args)
{
// The Kafka endpoint address
string kafkaEndpoint = "127.0.0.1:9092";
// The Kafka topic we'll be using
string kafkaTopic = "testtopic";
// Create the producer configuration
var producerConfig = new Dictionary<string, object> { { "bootstrap.servers", kafkaEndpoint } };
// Create the producer
using (var producer = new Producer<Null, string>(producerConfig, null, new StringSerializer(Encoding.UTF8)))
{
// Send 10 messages to the topic
for (int i = 0; i < 10; i++)
{
var message = $"Event {i}";
var result = producer.ProduceAsync(kafkaTopic, null, message).GetAwaiter().GetResult();
Console.WriteLine($"Event {i} sent on Partition: {result.Partition} with Offset: {result.Offset}");
}
}
// Create the consumer configuration
var consumerConfig = new Dictionary<string, object>
{
{ "group.id", "myconsumer" },
{ "bootstrap.servers", kafkaEndpoint },
};
// Create the consumer
using (var consumer = new Consumer<Null, string>(consumerConfig, null, new StringDeserializer(Encoding.UTF8)))
{
// Subscribe to the OnMessage event
consumer.OnMessage += (obj, msg) =>
{
Console.WriteLine($"Received: {msg.Value}");
};
// Subscribe to the Kafka topic
consumer.Subscribe(new List<string>() { kafkaTopic });
// Handle Cancel Keypress
var cancelled = false;
Console.CancelKeyPress += (_, e) =>
{
e.Cancel = true; // prevent the process from terminating.
cancelled = true;
};
Console.WriteLine("Ctrl-C to exit.");
// Poll for messages
while (!cancelled)
{
consumer.Poll();
}
}
}
}
}
Start Kafka with Docker#
You’ll need to add the following address range to your docker unsafe registry: 172.18.0.0/16 Create a docker-compose.yml file with the following contents:
version: '2'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka:0.10.2.0
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
KAFKA_CREATE_TOPICS: "testtopic:1:1"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- /var/run/docker.sock:/var/run/docker.sock
And run the following command:
docker-compose up
It will take a while but you’ll get a working Kafka installation.
Run the program#
Run the program and enjoy!
dotnet run
Get the code and related files here: https://github.com/cmendible/dotnetcore.samples/tree/main/kafka.pubsub.console
Hope it helps!