Kafka Streams Push Gateway
Introduction
Organizations like DoorDash/UberEats/Swiggy/Zomato that take orders from customers and use an operational database to store the orders and keep updating the status of the orders. Status update needs to be visible for customers to make sure that they know what is happening to their orders (e.g., order accepted, order in progress, order ready for delivery, order is on route, and order is delivered, among the many possible states).
What’s needed to Implement this System?
To send these updates to customers (possibly hundreds or thousands per second), they need to get the updates from their suppliers (Kitchens, Restaurants), update their operational database, capture the changes (Change Data Capture or CDC) into a Kafka Topic, have consumers read the records in the Kafka Topic, sift through the ones that need to be updated, maintain connections to the customers and send the updates. In doing all this, they need to make sure that they take care of failures that occur during the updates (possibly using durable execution via Temporal Workflows and Activities).
Illustrative Steps to Implementing Real-Time Order Updates
- Centralize Data: Store all relevant order data in a Kafka topic.
- Schema Management: Ensure the data in Kafka has a well-defined schema.
- Stream Processing: Develop a Kafka Streams application to process updates as they become available.
- Key Mapping: Implement a mechanism to map topics/partitions to orders/users.
- Stream Topology: Define a stream topology with a GroupBy key operation, repartition the topic, and make the data accessible via a web interface.
- Client Communication: Implement a polling or asynchronous mechanism (like WebSockets) to push updates to clients.
Our Solution - Steams Push Gateway Accelerator
We have built a system that takes care of automatically subscribing to updates related to specific data present in the records/messages in the Kafka Topic and routing them to the subscribers.
Overview of the System
Order Updates Flow
- Order Service: Publishes updates to a database containing customer_id and order_id.
- CDC Integration: A Kafka Connect application captures CDC updates and writes them to a Kafka topic.
- Kafka Streams Application: Consumes this data, groups it by key fields (e.g., order_id or customer_id), and stores it locally after performing a GroupBy operation.
Client Communication
- A mobile application requests updates for a specific order via an API Gateway, which forwards the request to the Kafka Streams application.
- The connection is upgraded to a WebSocket, through which the Kafka Streams application sends regular updates (configurable interval) back to the client.
API Gateway
- The API Gateway handles authentication, authorization, and request/response transformations.
- It provides load balancing by routing calls to different Kafka Streams Application instances, ensuring even distribution of traffic and avoiding single points of failure.
API Gateway
API Gateway (we recommend and use Kong that) can do the authentication. Also, provide load balancing to route the calls to a different Kafka Streams Application through an upstream (which is a Kong construct for load balancing across multiple targets with each target consisting of the hostname:port).
Refer the upstreams documentation from Kong to create the upstream.
Kafka Streams Application
The Kafka Streams application consists of:
- Declarative Configuration: Specifies the Kafka bootstrap servers, application ID, state store location, input/output topics, and keys for the GroupBy operation.
- REST Endpoint: Configured as an upstream target for the API Gateway, enabling other Kafka Streams instances to query keys not present in their local store.
- Stream Topology: Manages the GroupBy operation for the specified key (e.g., order_id).
- WebSocket Communication: Upgrades incoming connections to WebSocket and publishes updates.
- Data Routing: Routes requests to the correct host if the requested key is not available locally.
Declarative configuration
The Kafka Streams Application can use the below declarative configuration which will be used to start the application.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
kafka:
bootstrap-servers: localhost:9092
streams:
application-id: kafka-streams-app
default-key-serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default-value-serde: org.apache.kafka.common.serialization.Serdes$StringSerde
properties:
commit.interval.ms: 1000
cache.max.bytes.buffering: 10485760
state-dir: /tmp/kafka-streams
consumer:
group-id: kafka-streams-group
auto-offset-reset: earliest
producer:
retries: 3
batch-size: 16384
buffer-memory: 33554432
topics:
input-topic: orders
output-topic: orders-by-key
application:
stream:
group-by-key-enabled: true
group-by-key-operation:
# the key could be any field within the record and can be
# addressed using a format like ‘orders.items.item_id’
key: order_id
description: Group orders by their key (e.g., order ID or customer ID)
update:
interval:5000
description: Send updates to clients periodically using the interval (5000 means 5 seconds)
REST Endpoint
- Retrieve the status of an order using the endpoint: /update/orders/{order_id}.
- Implemented using Spring Boot and integrated into the Kafka Streams Application.
Web Socket Implementation
The WebSocketConfig
configuration class registers the handler and the path to listen to.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
private final KeyValueHandler keyValueHandler;
@Autowired
public WebSocketConfig(KeyValueHandler keyValueHandler) {
this.keyValueHandler = keyValueHandler;
}
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(keyValueHandler, "/update/orders/{key}")
.addInterceptors(new KeyValueInterceptor())
.setAllowedOrigins("*");
}
}
We implement the logic to retrieve the data (only relevant methods are shown here)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private void sendMessage(WebSocketSession session) throws IOException {
String key = (String) session.getAttributes().get("key");
final HostStoreInfo hostStoreInfo = streamsMetadataForStoreAndKey(storeName, key);
if (!thisHost(hostStoreInfo)){
session.sendMessage(new TextMessage(fetchByKey(hostStoreInfo, "update/orders/"+ key).toString()));
}
final ReadOnlyKeyValueStore<String, Long> store =
kafkaStreams.store(fromNameAndType(storeName, QueryableStoreTypes.keyValueStore()));
if (store == null) {
throw new NotFoundException();
}
// Get the value from the store
final String value = store.get(key);
if (value == null) {
throw new NotFoundException();
}
session.sendMessage(new TextMessage(new KeyValueBean(key, value).toString()));
}
Retrieve the data from the host that has the actual data (routing to a different host)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private boolean thisHost(final HostStoreInfo host) {
return host.getHost().equals(hostInfo.host()) &&
host.getPort() == hostInfo.port();
}
private KeyValueBean fetchByKey(final HostStoreInfo host, final String path) {
String url = String.format("http://%s:%d/%s", host, port, path);
// Make the GET request
ResponseEntity<KeyValueBean> response = restTemplate.getForEntity(url, KeyValueBean.class);
// Return the body of the response
return response.getBody();
}
public HostStoreInfo streamsMetadataForStoreAndKey(final String store,
final String key) {
return metadataService.streamsMetadataForStoreAndKey(store, key, new StringSerializer());
}
Client
The client could be a mobile application that makes a connection to the Kafka Streams application via the API Gateway. The connection gets upgraded to a web socket connection and the data is received at regular intervals (configured in the streams application). Once the client has received all the updates, it can close the connection.
Websocket payload and response
A HTTP Get connection is upgraded to a Web Socket Secure (WSS) connection. The payload structure of a WebSocket message depends on the protocol and use case. Here’s an example of a WebSocket request payload in JSON format, which is common in many real-time applications.
Client to Server (Requesting Order Updates for a specific Order id)
1
2
3
4
5
6
{
"type": "order_status_updates_request",
"data": {
"order_id": "12345"
}
}
Server to Client (Sending Order Updates)
1
2
3
4
5
6
7
8
9
{
"type": "order_status_update",
"data": {
"order_id": "12345",
"status": "Shipped",
"estimated_delivery": "2024-08-28",
"tracking_number": "1Z999AA10123456784"
}
}
Wscat commands with Basic Authentication (handled by the API Gateway)
1
2
3
4
5
echo -n "username:password" | base64
# Assume that the base64 encoding results in “dXNlcm5hbWU6cGFzc3dvcmQ=”
# call the order updates api via the Kong API gateway with basic authentication
wscat -c "wss://localhost:8000/update/orders" -H "Authorization: Basic dXNlcm5hbWU6cGFzc3dvcmQ="
After the successful connection, send
1
2
3
4
{
"type": "order_updates_request",
"order_id": "12345"
}
Conclusion
The Streams Push Gateway Accelerator leverages Kafka Streams, RESTful endpoints, and WebSockets to deliver a robust, scalable solution for pushing real-time order updates to clients. By integrating with an API Gateway and employing efficient stream processing techniques, this solution ensures that customers receive timely updates on their orders, enhancing the user experience.
Please contact us at hello@platformatory.com to discuss your use case and we would be happy to help!
References
Confluent Kafka Streams Interactive Queries: Link
About The Authors
About The Authors