Unified access layer for Kafka resources: Part 2: Rule Based Routing Layer
Introduction
This is the Part 2 which is a follow up of Part 1 of the Unified Access Layer for Kafka Resources where we dive deep into the design and implementation of discovering Kafka resources via a rule based routing layer.
Approach 1: Discover topics through a rule based routing layer
Just to recap, here we are trying to build the pattern where a language specific SDK handles the Kafka resources discovery by talking to a catalog service.
Let’s refer the overall architecture diagram:
Implementation
Catalog Service
This represents the catalog of all the Kafka resources that are needed by Clients (Producers and Consumers). This is implemented in the Kong API Gateway with
- A service that represents the Catalog Service upstream
- A route “/kafka-service-gw” on the service
- A custom plugin (or a pre-functions/post-functions plugin) that has a set of rules configured to retrieve the right Kafka bootstrap servers and topic information based on a set of query parameters (Channel, ServiceType, Organization). Clients can be either 1 call (ideally) or multiple calls to retrieve the information.
- Auth: all these calls are protected using Basic Auth
The code can be found here
Java Implementation for Kafka Producer and Consumer
Kafka Producer
Like we highlighted above, we need a custom consumer that can call the Catalog Service passing in the required information to locate the bootstrap servers and the topic. These will then be used to produce records (instead of the static configuration of the bootstrap servers and the topic). Here, we create a new ServiceLocatorProduder which is a facade around the KafkaProduder (routes all the calls to the internal Kafka Producer object). It first calls the Catalog Service passing the information needed and obtains the bootstrap servers and the kafka topic.
As you can see below, any configuration change during the send
will trigger an automatic refresh of the bootstrap servers and the topic. After this, everything will work seamlessly as you can see the execution.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
class BasicInterceptor implements Interceptor {
String credentials;
BasicInterceptor(String id, String password) {
credentials = Credentials.basic(id, password);
}
@NotNull
@Override
public Response intercept(@NotNull Chain chain) throws IOException {
Request request = chain.request();
Request.Builder builder = request.newBuilder().header("Authorization", credentials);
return chain.proceed(builder.build());
}
}
public class ServiceLocatorProducer<K,V> implements Producer {
private OkHttpClient client = new OkHttpClient.Builder().build();
private LoadingCache<String, Map<String, String>> cache;
public static final String CACHE_KEY = "service-map";
public static final String KAFKA_TOPIC_KEY = "kafka_topic";
public static final String SERVICE_LOCATOR_BASE_URL = "http://localhost:8000/kafka-service-gw/";
private Properties properties;
private KafkaProducer kafkaProducer;
private void initCache() throws ExecutionException {
cache = CacheBuilder.newBuilder()
.maximumSize(10)
.expireAfterWrite(5, TimeUnit.SECONDS)
.build(
new CacheLoader<String, Map<String, String>>() {
public Map<String, String> load(String id) throws IOException {
final Map<String, String> svcMap = getServiceConfiguration();
return svcMap;
}
}
);
}
private void createProducer(Properties properties) throws ExecutionException {
properties.put(BOOTSTRAP_SERVERS_CONFIG, cache.get(CACHE_KEY).get(BOOTSTRAP_SERVERS_CONFIG));
System.out.println(cache.get(CACHE_KEY).get(BOOTSTRAP_SERVERS_CONFIG));
this.properties = properties;
kafkaProducer = new KafkaProducer(properties);
}
ServiceLocatorProducer(Properties properties) throws ExecutionException {
initCache();
createProducer(properties);
}
String getTopic() throws ExecutionException {
return cache.get(CACHE_KEY).get(KAFKA_TOPIC_KEY);
}
public Map<String,String> getServiceConfiguration() throws IOException {
OkHttpClient client = new OkHttpClient.Builder()
.callTimeout(5, TimeUnit.MINUTES)
.connectTimeout(5, TimeUnit.MINUTES)
.addInterceptor(new BasicInterceptor("user1", "password1"))
.build();
Map<String, String> kafkaSvcLocMap = new HashMap<>();
String topic = getTopicConfiguraion(client);
kafkaSvcLocMap.put(KAFKA_TOPIC_KEY, topic);
String bootstrapServersConfig = getBootstrapServersConfig(client);
kafkaSvcLocMap.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServersConfig);
return kafkaSvcLocMap;
}
@NotNull
private String getBootstrapServersConfig(OkHttpClient client) throws IOException {
Request request = new Request.Builder()
.url(SERVICE_LOCATOR_BASE_URL + "kafka_clusters?domain=example.org")
.build();
Response response = client.newCall(request).execute();
if (!response.isSuccessful()) throw new IOException("Unexpected code " + response);
JsonObject jsonObject = JsonParser.parseString(response.body().string()).getAsJsonObject();
System.out.println("bootstrap:" + jsonObject.get("bootstrap_servers").getAsString());
return jsonObject.get("bootstrap_servers").getAsString();
}
@NotNull
private String getTopicConfiguraion(OkHttpClient client) throws IOException {
Request request = new Request.Builder()
.url(SERVICE_LOCATOR_BASE_URL + "channels?channel_name=channel1")
.build();
Response response = client.newCall(request).execute();
if (!response.isSuccessful()) throw new IOException("Unexpected code " + response);
JsonObject jsonObject = JsonParser.parseString(response.body().string()).getAsJsonObject();
System.out.println(jsonObject.get("resolved_value").getAsString());
JsonObject innerObject = JsonParser.parseString(jsonObject.get("resolved_value").getAsString()).getAsJsonObject();
System.out.println(innerObject.get("topic1"));
return innerObject.get("topic1").getAsString();
}
@Override
public Future<RecordMetadata> send(ProducerRecord producerRecord) {
return send(producerRecord, null);
}
@Override
public Future<RecordMetadata> send(ProducerRecord producerRecord, Callback callback) {
String bootstrapServers = null;
String topic = null;
try {
bootstrapServers = cache.get(CACHE_KEY).get(BOOTSTRAP_SERVERS_CONFIG);
topic = cache.get(CACHE_KEY).get(KAFKA_TOPIC_KEY);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
if (this.properties.get(BOOTSTRAP_SERVERS_CONFIG).equals(bootstrapServers) &&
producerRecord.topic().equals(topic)) {
return kafkaProducer.send(producerRecord, callback);
} else {
System.out.printf(
"Need to update bootstrap servers config to %s and the topic config to %s and create a new Producer\n", bootstrapServers, topic);
this.close();
properties.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
kafkaProducer = new KafkaProducer(properties);
ProducerRecord newRecord = new ProducerRecord<>(topic, producerRecord.key(), producerRecord.value());
return kafkaProducer.send(newRecord, callback);
}
}
Kafka Producer App
Typically, the producer application would include all the properties needed to successfully publish records to the Kafka cluster (especially the 2 key things: 1/ Bootstrap servers, 2/ Topic). Like shown below, the ServiceLocatorProducer does the magic to get the right bootstrap servers and topic (instead of the commented out static information).
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public class App {
public static void main(String[] args ) throws Exception, ExecutionException, InterruptedException {
Properties config = new Properties();
// Not adding the bootstrap.servers config because it will be retrieved automatically
config.put(ACKS_CONFIG, "all");
config.put(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getCanonicalName());
config.put(VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getCanonicalName());
String [] keys = {"joe", "jill", "justie"};
String [] values = {"crease", "myers", "hill"};
try (final ServiceLocatorProducer<String, String> producer = new ServiceLocatorProducer<>(config)) {
final Random rnd = new Random();
final int numMessages = 10000;
for (int i = 0; i < numMessages; i++) {
String user = keys[rnd.nextInt(keys.length)];
String item = values[rnd.nextInt(values.length)];
// topic is obtained automatically from the producer and updated if it is different
// when we receive the topic from the RecordMetadata received from the send method
AtomicReference<String> topic = new AtomicReference<>(producer.getTopic());
producer.send(
new ProducerRecord<>(topic.get(), user, item),
(event, ex) -> {
if (ex != null)
ex.printStackTrace();
else {
System.out.printf("Produced event to topic %s: key = %-10s value = %s%n", topic, user, item);
if (!topic.get().equals(event.topic())) {
topic.set(event.topic());
}
}
});
// Only to demonstrate the change in configurations
Thread.sleep(100);
}
System.out.printf("%s events were produced to topic %s%n", numMessages, producer.getTopic());
}
}
}
Kafka Producer in Action
Just showing the excerpts from the full execution here.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
C:\Users\nragh\.jdks\corretto-11.0.22\bin\java.exe "-javaagent:C:\Program Files\JetBrains\IntelliJ IDEA Community Edition 2023.2.2\lib\idea_rt.jar=55727:C:\Program Files\JetBrains\IntelliJ IDEA Community Edition 2023.2.2\bin" -Dfile.encoding=UTF-8 -classpath C:\Users\nragh\IdeaProjects\kafka-producer\target\classes;C:\Users\nragh\.m2\repository\org\apache\kafka\kafka-clients\7.0.1-ccs\kafka-clients-7.0.1-ccs.jar;C:\Users\nragh\.m2\repository\com\github\luben\zstd-jni\1.5.0-2\zstd-jni-1.5.0-2.jar;C:\Users\nragh\.m2\repository\org\lz4\lz4-java\1.7.1\lz4-java-1.7.1.jar;C:\Users\nragh\.m2\repository\org\xerial\snappy\snappy-java\1.1.8.1\snappy-java-1.1.8.1.jar;C:\Users\nragh\.m2\repository\org\slf4j\slf4j-api\1.7.30\slf4j-api-1.7.30.jar;C:\Users\nragh\.m2\repository\org\apache\commons\commons-lang3\3.14.0\commons-lang3-3.14.0.jar;C:\Users\nragh\.m2\repository\com\squareup\okhttp3\okhttp\4.12.0\okhttp-4.12.0.jar;C:\Users\nragh\.m2\repository\com\squareup\okio\okio\3.6.0\okio-3.6.0.jar;C:\Users\nragh\.m2\repository\com\squareup\okio\okio-jvm\3.6.0\okio-jvm-3.6.0.jar;C:\Users\nragh\.m2\repository\org\jetbrains\kotlin\kotlin-stdlib-common\1.9.10\kotlin-stdlib-common-1.9.10.jar;C:\Users\nragh\.m2\repository\org\jetbrains\kotlin\kotlin-stdlib-jdk8\1.8.21\kotlin-stdlib-jdk8-1.8.21.jar;C:\Users\nragh\.m2\repository\org\jetbrains\kotlin\kotlin-stdlib\1.8.21\kotlin-stdlib-1.8.21.jar;C:\Users\nragh\.m2\repository\org\jetbrains\annotations\13.0\annotations-13.0.jar;C:\Users\nragh\.m2\repository\org\jetbrains\kotlin\kotlin-stdlib-jdk7\1.8.21\kotlin-stdlib-jdk7-1.8.21.jar;C:\Users\nragh\.m2\repository\com\google\guava\guava\33.2.1-jre\guava-33.2.1-jre.jar;C:\Users\nragh\.m2\repository\com\google\guava\failureaccess\1.0.2\failureaccess-1.0.2.jar;C:\Users\nragh\.m2\repository\com\google\guava\listenablefuture\9999.0-empty-to-avoid-conflict-with-guava\listenablefuture-9999.0-empty-to-avoid-conflict-with-guava.jar;C:\Users\nragh\.m2\repository\com\google\code\findbugs\jsr305\3.0.2\jsr305-3.0.2.jar;C:\Users\nragh\.m2\repository\org\checkerframework\checker-qual\3.42.0\checker-qual-3.42.0.jar;C:\Users\nragh\.m2\repository\com\google\errorprone\error_prone_annotations\2.26.1\error_prone_annotations-2.26.1.jar;C:\Users\nragh\.m2\repository\com\google\j2objc\j2objc-annotations\3.0.0\j2objc-annotations-3.0.0.jar;C:\Users\nragh\.m2\repository\com\google\code\gson\gson\2.11.0\gson-2.11.0.jar com.platformatory.App
{"topic1": "users", "topic2": "users-1"}
"users"
bootstrap:localhost:9092,localhost:9093,localhost:9094
localhost:9092,localhost:9093,localhost:9094
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Produced event to topic users: key = joe value = myers
Produced event to topic users: key = joe value = crease
Produced event to topic users: key = justie value = myers
Produced event to topic users: key = jill value = hill
Produced event to topic users: key = jill value = hill
Produced event to topic users: key = joe value = hill
Produced event to topic users: key = justie value = hill
Produced event to topic users: key = justie value = crease
Produced event to topic users: key = joe value = myers
...
bootstrap:localhost:9092,localhost:9093
Need to update bootstrap servers config to {localhost:9092,localhost:9093} and the topic config to {users-1} and create a new Producer
Produced event to topic users-1: key = joe value = crease
Produced event to topic users-1: key = justie value = crease
Produced event to topic users-1: key = joe value = myers
Produced event to topic users-1: key = justie value = crease
Produced event to topic users-1: key = justie value = myers
Produced event to topic users-1: key = jill value = hill
Produced event to topic users-1: key = justie value = hill
Produced event to topic users-1: key = joe value = hill
Produced event to topic users-1: key = jill value = myers
Produced event to topic users-1: key = justie value = hill
Produced event to topic users-1: key = jill value = hill
Produced event to topic users-1: key = joe value = hill
...
Kafka Consumer
Just like the facade created by the Kafka Producer, we do the same here called the ServiceLocatorConsumer, which routes all the calls to the internal Kafka Consumer object. Also, just like the ServiceLocatorProducer, it calls the Catalog Service and obtains the same 2 key properties needed to consume data.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
class BasicInterceptor implements Interceptor {
String credentials;
BasicInterceptor(String id, String password) {
credentials = Credentials.basic(id, password);
}
@NotNull
@Override
public Response intercept(@NotNull Chain chain) throws IOException {
Request request = chain.request();
Request.Builder builder = request.newBuilder().header("Authorization", credentials);
return chain.proceed(builder.build());
}
}
public class ServiceLocatorConsumer<K, V> implements Consumer<K, V> {
private OkHttpClient client;
private LoadingCache<String, Map<String, String>> cache;
public static final String CACHE_KEY = "service-map";
public static final String KAFKA_TOPIC_KEY = "kafka_topic";
public static final String SERVICE_LOCATOR_BASE_URL = "http://localhost:8000/kafka-service-gw/";
private Properties properties;
private KafkaConsumer kafkaConsumer;
public ServiceLocatorConsumer(Map<String, Object> configs) {
kafkaConsumer = new KafkaConsumer<>(configs);
}
private void initCache() throws ExecutionException {
cache = CacheBuilder.newBuilder()
.maximumSize(100)
.expireAfterWrite(5, TimeUnit.SECONDS)
.build(
new CacheLoader<String, Map<String, String>>() {
public Map<String, String> load(String id) throws IOException {
final Map<String, String> svcMap = getServiceConfiguration();
return svcMap;
}
}
);
}
private void createConsumer(Properties properties) throws ExecutionException {
properties.put(BOOTSTRAP_SERVERS_CONFIG, cache.get(CACHE_KEY).get(BOOTSTRAP_SERVERS_CONFIG));
System.out.println(cache.get(CACHE_KEY).get(BOOTSTRAP_SERVERS_CONFIG));
this.properties = properties;
kafkaConsumer = new KafkaConsumer<>(properties);
}
public ServiceLocatorConsumer(Properties properties) throws ExecutionException {
client = new OkHttpClient.Builder().build();
initCache();
createConsumer(properties);
}
public Map<String,String> getServiceConfiguration() throws IOException {
OkHttpClient client = new OkHttpClient.Builder()
.callTimeout(5, TimeUnit.MINUTES)
.connectTimeout(5, TimeUnit.MINUTES)
.addInterceptor(new BasicInterceptor("user1", "password1"))
.build();
Map<String, String> kafkaSvcLocMap = new HashMap<>();
String topic = getTopicConfiguraion(client);
kafkaSvcLocMap.put(KAFKA_TOPIC_KEY, topic);
String bootstrapServersConfig = getBootstrapServersConfig(client);
kafkaSvcLocMap.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServersConfig);
return kafkaSvcLocMap;
}
@NotNull
private String getBootstrapServersConfig(OkHttpClient client) throws IOException {
Request request = new Request.Builder()
.url(SERVICE_LOCATOR_BASE_URL + "kafka_clusters?domain=example.org")
.build();
Response response = client.newCall(request).execute();
if (!response.isSuccessful()) throw new IOException("Unexpected code " + response);
JsonObject jsonObject = JsonParser.parseString(response.body().string()).getAsJsonObject();
System.out.println("bootstrap:" + jsonObject.get("bootstrap_servers").getAsString());
return jsonObject.get("bootstrap_servers").getAsString();
}
@NotNull
private String getTopicConfiguraion(OkHttpClient client) throws IOException {
Request request = new Request.Builder()
.url(SERVICE_LOCATOR_BASE_URL + "channels?channel_name=channel1")
.build();
Response response = client.newCall(request).execute();
if (!response.isSuccessful()) throw new IOException("Unexpected code " + response);
JsonObject jsonObject = JsonParser.parseString(response.body().string()).getAsJsonObject();
System.out.println(jsonObject.get("resolved_value").getAsString());
JsonObject innerObject = JsonParser.parseString(jsonObject.get("resolved_value").getAsString()).getAsJsonObject();
System.out.println(innerObject.get("topic1"));
return innerObject.get("topic1").getAsString();
}
@Override
public ConsumerRecords<K, V> poll(long l) {
return kafkaConsumer.poll(l);
}
@Override
public ConsumerRecords<K, V> poll(Duration duration) {
String bootstrapServers = null;
String topic = null;
try {
bootstrapServers = cache.get(CACHE_KEY).get(BOOTSTRAP_SERVERS_CONFIG);
topic = cache.get(CACHE_KEY).get(KAFKA_TOPIC_KEY);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
if (!this.properties.get(BOOTSTRAP_SERVERS_CONFIG).equals(bootstrapServers) ||
!this.listTopics().containsKey(topic)) {
System.out.printf(
"Need to update bootstrap servers config to %s from %s and create a new Consumer\n", bootstrapServers, properties.get(BOOTSTRAP_SERVERS_CONFIG));
properties.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
this.unsubscribe();
this.close();
kafkaConsumer = new KafkaConsumer<>(properties);
kafkaConsumer.subscribe(Arrays.asList(topic));
}
return kafkaConsumer.poll(duration);
}
Kafka Consumer App
Just like the Kafka Producer App, the Consumer App instantiates the ServiceLocatorConsumer and seamlessly starts consuming from the obtained topic. You can see how the commented out code for the bootstrap servers and the topic show the dynamic nature of the whole system. Also notice that if a configuration change happens during the poll, we automatically refresh the bootstrap servers or the topic configuration and take care of: 1/ updating the bootstrap servers, 2/ on a topic change, we unsubscribe to the previous topics, close the consumer, create a new consumer and subscribe to the new topic(s). Post this, everything gets back to normal.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class App
{
public static void main( String[] args ) throws ExecutionException, InterruptedException {
Properties config = new Properties();
try {
config.put("client.id", InetAddress.getLocalHost().getHostName());
} catch (UnknownHostException e) {
throw new RuntimeException(e);
}
// Not adding the bootstrap.servers config because it will be retrieved automatically
config.put(AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(GROUP_ID_CONFIG, "kafka-java-consumer");
config.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName());
config.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName());
// topic is obtained automatically from the producer and updated if it is different
// when we receive the topic from the ConsumerRecord received from the poll method
try (final ServiceLocatorConsumer consumer = new ServiceLocatorConsumer<>(config)) {
consumer.subscribe(Arrays.asList(consumer.getTopic()));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
String key = record.key();
String value = record.value();
String topic = record.topic();
System.out.println(
String.format("Consumed event from topic %s: key = %-10s value = %s", topic, key, value));
}
}
}
}
}
Kafka Consumer in Action
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
C:\Users\nragh\.jdks\corretto-11.0.22\bin\java.exe "-javaagent:C:\Program Files\JetBrains\IntelliJ IDEA Community Edition 2023.2.2\lib\idea_rt.jar=55738:C:\Program Files\JetBrains\IntelliJ IDEA Community Edition 2023.2.2\bin" -Dfile.encoding=UTF-8 -classpath C:\Users\nragh\IdeaProjects\kakfa-consumer\target\classes;C:\Users\nragh\.m2\repository\org\apache\kafka\kafka-clients\7.6.1-ce\kafka-clients-7.6.1-ce.jar;C:\Users\nragh\.m2\repository\io\confluent\telemetry-events-api\7.6.1-ce\telemetry-events-api-7.6.1-ce.jar;C:\Users\nragh\.m2\repository\com\github\luben\zstd-jni\1.5.5-1\zstd-jni-1.5.5-1.jar;C:\Users\nragh\.m2\repository\org\lz4\lz4-java\1.8.0\lz4-java-1.8.0.jar;C:\Users\nragh\.m2\repository\org\xerial\snappy\snappy-java\1.1.10.5\snappy-java-1.1.10.5.jar;C:\Users\nragh\.m2\repository\org\slf4j\slf4j-api\1.7.36\slf4j-api-1.7.36.jar;C:\Users\nragh\.m2\repository\com\fasterxml\jackson\core\jackson-databind\2.11.0\jackson-databind-2.11.0.jar;C:\Users\nragh\.m2\repository\com\fasterxml\jackson\core\jackson-annotations\2.11.0\jackson-annotations-2.11.0.jar;C:\Users\nragh\.m2\repository\com\fasterxml\jackson\core\jackson-core\2.11.0\jackson-core-2.11.0.jar;C:\Users\nragh\.m2\repository\org\apache\commons\commons-lang3\3.14.0\commons-lang3-3.14.0.jar;C:\Users\nragh\.m2\repository\com\squareup\okhttp3\okhttp\4.12.0\okhttp-4.12.0.jar;C:\Users\nragh\.m2\repository\com\squareup\okio\okio\3.6.0\okio-3.6.0.jar;C:\Users\nragh\.m2\repository\com\squareup\okio\okio-jvm\3.6.0\okio-jvm-3.6.0.jar;C:\Users\nragh\.m2\repository\org\jetbrains\kotlin\kotlin-stdlib-common\1.9.10\kotlin-stdlib-common-1.9.10.jar;C:\Users\nragh\.m2\repository\org\jetbrains\kotlin\kotlin-stdlib-jdk8\1.8.21\kotlin-stdlib-jdk8-1.8.21.jar;C:\Users\nragh\.m2\repository\org\jetbrains\kotlin\kotlin-stdlib\1.8.21\kotlin-stdlib-1.8.21.jar;C:\Users\nragh\.m2\repository\org\jetbrains\annotations\13.0\annotations-13.0.jar;C:\Users\nragh\.m2\repository\org\jetbrains\kotlin\kotlin-stdlib-jdk7\1.8.21\kotlin-stdlib-jdk7-1.8.21.jar;C:\Users\nragh\.m2\repository\com\google\guava\guava\33.2.1-jre\guava-33.2.1-jre.jar;C:\Users\nragh\.m2\repository\com\google\guava\failureaccess\1.0.2\failureaccess-1.0.2.jar;C:\Users\nragh\.m2\repository\com\google\guava\listenablefuture\9999.0-empty-to-avoid-conflict-with-guava\listenablefuture-9999.0-empty-to-avoid-conflict-with-guava.jar;C:\Users\nragh\.m2\repository\com\google\code\findbugs\jsr305\3.0.2\jsr305-3.0.2.jar;C:\Users\nragh\.m2\repository\org\checkerframework\checker-qual\3.42.0\checker-qual-3.42.0.jar;C:\Users\nragh\.m2\repository\com\google\errorprone\error_prone_annotations\2.26.1\error_prone_annotations-2.26.1.jar;C:\Users\nragh\.m2\repository\com\google\j2objc\j2objc-annotations\3.0.0\j2objc-annotations-3.0.0.jar;C:\Users\nragh\.m2\repository\com\google\code\gson\gson\2.11.0\gson-2.11.0.jar com.platformatory.App
{"topic1": "users", "topic2": "users-1"}
"users"
bootstrap:localhost:9092,localhost:9093,localhost:9094
localhost:9092,localhost:9093,localhost:9094
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Consumed event from topic users: key = justie value = myers
Consumed event from topic users: key = justie value = crease
Consumed event from topic users: key = justie value = crease
Consumed event from topic users: key = justie value = crease
Consumed event from topic users: key = justie value = crease
Consumed event from topic users: key = justie value = hill
Consumed event from topic users: key = justie value = myers
Consumed event from topic users: key = justie value = myers
...
bootstrap:localhost:9092,localhost:9093
Need to update bootstrap servers config to {localhost:9092,localhost:9093} from {localhost:9092,localhost:9093,localhost:9094} and create a new Consumer
Consumed event from topic users-1: key = joe value = crease
Consumed event from topic users-1: key = joe value = myers
Consumed event from topic users-1: key = joe value = hill
Consumed event from topic users-1: key = joe value = hill
Consumed event from topic users-1: key = joe value = hill
Consumed event from topic users-1: key = joe value = myers
Consumed event from topic users-1: key = justie value = crease
Consumed event from topic users-1: key = justie value = crease
Consumed event from topic users-1: key = justie value = myers
Consumed event from topic users-1: key = justie value = hill
Consumed event from topic users-1: key = justie value = hill
...
Python Implementation
Coming soon…
Conclusion
Creating a system that helps a centralized Kafka infrastructure team to easily create, label and vend information reduces common problems and dependencies. Same benefits are passed on to the producers and consumers thus creating a scalable system/organization.
About The Authors
About The Authors