Using Kafka REST Proxy with Protobuf
The Kafka REST Proxy provides REST API to produce and consume messages and view metadata of cluster, broker, partition, and topic.
Using REST API is the fastest way to experiment with producing and consuming messages from Kafka broker. But it is not simple to use this API set, as with the same API endpoint, it works for different serialization formats. In this post, we use Confluent 5.5, which adding support of Protobuf and JSON Schema to the Schema Registry, and we will use API endpoint to produce messages to a Kafka topic with Protobuf schema, and use API endpoints to consume messages from the topic.
Content Types
Content types are the first and essential thing to look at when using the API set. It used for Content-Type and Accept headers of the HTTP request.
This REST API has three versions v1, v2, and v3, but version v3 is in the preview stage, and we do not look at it here
application/vnd.kafka.{Embeded format}.{API version}+json
Here are the samples:
The Avro content type is application/vnd.kafka.avro.v2+json
The Protobuf content type is application/vnd.kafka.protobuf.v2+json
The JSON schema content type is application/vnd.kafka.jsonschema.v2+json
We will see how we use it in the next sections.
Message Schema
First, we define a schema for producer and consumer, we use the same for both here, but it can involve the schema any time without any impact of other ends. That is the purpose of using schema. Here is the schema of Protobuf format that we use in this post.
message value_salesorder_topic {
required uint32 orderId = 1;
optional string orderType = 2;
required uint32 plant = 3;
optional string soldToCustomer = 4;
optional string targetSystem = 5;
}
Use the Confluent Control Center to set the schema for the topic
Produce Message
We will send messages to this Kafka topic “salesorder_topic”.
The endpoint
POST /topic/salesorder_topic
Content-Type
We use Protobuf schema and version 2 of the API. And this is the Content-Type header.
application/vnd.kafka.protobuf.v2+json
Before making the HTTP request to send a message, we need the schema ID that we set to the topic from the previous step.
Make a request to the Schema Registry:
curl --location --request GET 'http://{SchemaRegistryHost}:{SchemaRegistryPort}/subjects/salesorder_topic-value/versions/1'
Response: we got the schema ID 41
{
"subject": "salesorder_topic-value",
"version": 1,
"id": 41,
"schemaType": "PROTOBUF",
"schema": "\nmessage value_salesorder_topic {\n required uint32 orderId = 1;\n optional string orderType = 2;\n required uint32 plant = 3;\n optional string soldToCustomer = 4;\n optional string targetSystem = 5;\n}\n"
}
Put altogether we have this request to send a message:
curl --location --request POST 'http://{RestProxyHost}:{RestProxyPort}/topics/salesorder_topic' \
--header 'Content-Type: application/vnd.kafka.protobuf.v2+json' \
--data-raw '{
"key_schema": null,
"value_schema_id": 41,
"records": [
{
"key": null,
"value": {
"orderId": 1,
"orderType": "SERVICE",
"plant": 5000,
"soldToCustomer": "2000",
"targetSystem": "SYS1"
}
}
]
}'
Noticed the host and port to send requests are different for the REST Proxy and the Schema Registry.
Consume Message
It is not a single endpoint request as the producer. It has to follow steps in order to consume messages from the Kafka topic.
1. Create a consumer instance
2. Subscribe to a topic or list of topics.
3. Consume messages
Create a consumer instance
This is a stateful request, the consumer instance is removed after 5 minutes idle, and the output of “base_uri” is used for the next steps.
The endpoint
POST / consumers /{group name}
Request
curl --location --request POST 'http://{RestProxyHost}:{RestProxyPort}/consumers/consumergroup' \
--header 'Content-Type: application/vnd.kafka.v2+json' \
--data-raw '{
"name": "my_consumer_proto",
"format": "protobuf"
}'
Notice the format value of “protobuf” is required as we are using Protobuf schema. The value is “avro” and “jsonschema” for Avro and JSON schema accordingly.
Example Response
{
"instance_id": "my_consumer_proto",
"base_uri": "http://kafkaproxy.sample/consumers/consumergroup/instances/my_consumer_proto"
}
Subscribe to a topic
The endpoint
{value fo base_uri from the previous step}/subscription
Request
curl --location --request POST 'http://kafkaproxy.sample/consumers/consumergroup/instances/my_consumer_proto/subscription' \
--header 'Content-Type: application/vnd.kafka.protobuf.v2+json' \
--data-raw '{
"topics": [
"salesorder_topic"
]
}'
Notice at Content-Type and topic name, this is topic with Protobuf schema that we want to consume messages.
Consume messages
The endpoint
{value fo base_uri from the previous step}/records
Accept header
Accept: application/vnd.kafka.protobuf.v2+json
Request
curl --location --request GET 'http://kafkaproxy.sample/consumers/consumergroup/instances/my_consumer_proto/records?timeout=3000&max_bytes=300000' \
--header 'Accept: application/vnd.kafka.protobuf.v2+json'
Notice at Accept header, when we created a group instance from the previous step with the format “protobuf” then we have to specify which content type we will accept for the response message.
Example Response
[
{
"topic": "salesorder_topic",
"key": null,
"value": {
"orderId": 6,
"orderType": "SERVICE",
"plant": 5000,
"soldToCustomer": "2000",
"targetSystem": "SYS1"
},
"partition": 0,
"offset": 1
}
]
Conclusion
The REST Proxy API is a wrapper from the Java library and it is useful to test sending and receiving messages from Kafka. Still, it is not easy to use as we see it takes a couple of steps to receive messages and the group instance is expired every 5 minutes.
The same steps we can use for other content types such as Avro, JSON schema, all we need is to replace content type appropriately.
References
Confluent REST Proxy API reference
https://docs.confluent.io/current/kafka-rest/api.html