Last week I attended to a Kafka workshop and this is my attempt to show you a simple Step by step: Kafka Pub/Sub with Docker and .Net Core tutorial.

Let’s start:

Create a folder for your new project


Open a command prompt an run

1mkdir kafka.pubsub.console
2cd kafka.pubsub.console

Create a console project


1dotnet new console

Add the Confluent.Kafka nuget package


Add the Confluent.Kafka nuget package to your project:

1dotnet add package Confluent.Kafka -v 0.9.5
2dotnet restore

Replace the contents of Program.cs


Replace the contents of the Program.cs file with the following code:

 1using System;
 2using System.Collections.Generic;
 3using System.Text;
 4using System.Threading.Tasks;
 5using Confluent.Kafka;
 6using Confluent.Kafka.Serialization;
 7
 8namespace kafka.pubsub.console
 9{
10    class Program
11    {
12        static void Main(string[] args)
13        {
14            // The Kafka endpoint address
15            string kafkaEndpoint = "127.0.0.1:9092";
16
17            // The Kafka topic we'll be using
18            string kafkaTopic = "testtopic";
19
20            // Create the producer configuration
21            var producerConfig = new Dictionary<string, object> { { "bootstrap.servers", kafkaEndpoint } };
22
23            // Create the producer
24            using (var producer = new Producer<Null, string>(producerConfig, null, new StringSerializer(Encoding.UTF8)))
25            {
26                // Send 10 messages to the topic
27                for (int i = 0; i < 10; i++)
28                {
29                    var message = $"Event {i}";
30                    var result = producer.ProduceAsync(kafkaTopic, null, message).GetAwaiter().GetResult();
31                    Console.WriteLine($"Event {i} sent on Partition: {result.Partition} with Offset: {result.Offset}");
32                }
33            }
34
35            // Create the consumer configuration
36            var consumerConfig = new Dictionary<string, object>
37            {
38                { "group.id", "myconsumer" },
39                { "bootstrap.servers", kafkaEndpoint },
40            };
41
42            // Create the consumer
43            using (var consumer = new Consumer<Null, string>(consumerConfig, null, new StringDeserializer(Encoding.UTF8)))
44            {
45                // Subscribe to the OnMessage event
46                consumer.OnMessage += (obj, msg) =>
47                {
48                    Console.WriteLine($"Received: {msg.Value}");
49                };
50
51                // Subscribe to the Kafka topic
52                consumer.Subscribe(new List<string>() { kafkaTopic });
53
54                // Handle Cancel Keypress 
55                var cancelled = false;
56                Console.CancelKeyPress += (_, e) =>
57                {
58                    e.Cancel = true; // prevent the process from terminating.
59                    cancelled = true;
60                };
61
62                Console.WriteLine("Ctrl-C to exit.");
63
64                // Poll for messages
65                while (!cancelled)
66                {
67                    consumer.Poll();
68                }
69            }
70        }
71    }
72}

Start Kafka with Docker


You’ll need to add the following address range to your docker unsafe registry: 172.18.0.0/16 Create a docker-compose.yml file with the following contents:

 1version: '2'
 2services:
 3  zookeeper:
 4    image: wurstmeister/zookeeper
 5    ports:
 6      - "2181:2181"
 7  kafka:
 8    image: wurstmeister/kafka:0.10.2.0
 9    ports:
10      - "9092:9092"
11    environment:
12      KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
13      KAFKA_CREATE_TOPICS: "testtopic:1:1"
14      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
15    volumes:
16      - /var/run/docker.sock:/var/run/docker.sock

And run the following command:

1docker-compose up

It will take a while but you’ll get a working Kafka installation.

Run the program


Run the program and enjoy!

1dotnet run

Get the code and related files here: https://github.com/cmendible/dotnetcore.samples/tree/main/kafka.pubsub.console

Hope it helps!

Learn More