Last week I attended to a Kafka workshop and this is my attempt to show you a simple Step by step: Kafka Pub/Sub with Docker and .Net Core tutorial.
Let’s start:
Create a folder for your new project
Open a command prompt an run
1mkdir kafka.pubsub.console
2cd kafka.pubsub.console
Create a console project
1dotnet new console
Add the Confluent.Kafka nuget package
Add the Confluent.Kafka nuget package to your project:
1dotnet add package Confluent.Kafka -v 0.9.5
2dotnet restore
Replace the contents of Program.cs
Replace the contents of the Program.cs file with the following code:
1using System;
2using System.Collections.Generic;
3using System.Text;
4using System.Threading.Tasks;
5using Confluent.Kafka;
6using Confluent.Kafka.Serialization;
7
8namespace kafka.pubsub.console
9{
10 class Program
11 {
12 static void Main(string[] args)
13 {
14 // The Kafka endpoint address
15 string kafkaEndpoint = "127.0.0.1:9092";
16
17 // The Kafka topic we'll be using
18 string kafkaTopic = "testtopic";
19
20 // Create the producer configuration
21 var producerConfig = new Dictionary<string, object> { { "bootstrap.servers", kafkaEndpoint } };
22
23 // Create the producer
24 using (var producer = new Producer<Null, string>(producerConfig, null, new StringSerializer(Encoding.UTF8)))
25 {
26 // Send 10 messages to the topic
27 for (int i = 0; i < 10; i++)
28 {
29 var message = $"Event {i}";
30 var result = producer.ProduceAsync(kafkaTopic, null, message).GetAwaiter().GetResult();
31 Console.WriteLine($"Event {i} sent on Partition: {result.Partition} with Offset: {result.Offset}");
32 }
33 }
34
35 // Create the consumer configuration
36 var consumerConfig = new Dictionary<string, object>
37 {
38 { "group.id", "myconsumer" },
39 { "bootstrap.servers", kafkaEndpoint },
40 };
41
42 // Create the consumer
43 using (var consumer = new Consumer<Null, string>(consumerConfig, null, new StringDeserializer(Encoding.UTF8)))
44 {
45 // Subscribe to the OnMessage event
46 consumer.OnMessage += (obj, msg) =>
47 {
48 Console.WriteLine($"Received: {msg.Value}");
49 };
50
51 // Subscribe to the Kafka topic
52 consumer.Subscribe(new List<string>() { kafkaTopic });
53
54 // Handle Cancel Keypress
55 var cancelled = false;
56 Console.CancelKeyPress += (_, e) =>
57 {
58 e.Cancel = true; // prevent the process from terminating.
59 cancelled = true;
60 };
61
62 Console.WriteLine("Ctrl-C to exit.");
63
64 // Poll for messages
65 while (!cancelled)
66 {
67 consumer.Poll();
68 }
69 }
70 }
71 }
72}
Start Kafka with Docker
You’ll need to add the following address range to your docker unsafe registry: 172.18.0.0/16 Create a docker-compose.yml file with the following contents:
1version: '2'
2services:
3 zookeeper:
4 image: wurstmeister/zookeeper
5 ports:
6 - "2181:2181"
7 kafka:
8 image: wurstmeister/kafka:0.10.2.0
9 ports:
10 - "9092:9092"
11 environment:
12 KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
13 KAFKA_CREATE_TOPICS: "testtopic:1:1"
14 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
15 volumes:
16 - /var/run/docker.sock:/var/run/docker.sock
And run the following command:
1docker-compose up
It will take a while but you’ll get a working Kafka installation.
Run the program
Run the program and enjoy!
1dotnet run
Get the code and related files here: https://github.com/cmendible/dotnetcore.samples/tree/main/kafka.pubsub.console
Hope it helps!
Comments