Using Kafka REST Proxy with Protobuf

The Kafka REST Proxy provides REST API to produce and consume messages and view metadata of cluster, broker, partition, and topic.

Using REST API is the fastest way to experiment with producing and consuming messages from Kafka broker. But it is not simple to use this API set, as with the same API endpoint, it works for different serialization formats. In this post, we use Confluent 5.5, which adding support of Protobuf and JSON Schema to the Schema Registry, and we will use API endpoint to produce messages to a Kafka topic with Protobuf schema, and use API endpoints to consume messages from the topic.

Content Types

Content types are the first and essential thing to look at when using the API set. It used for Content-Type and Accept headers of the HTTP request.

This REST API has three versions v1, v2, and v3, but version v3 is in the preview stage, and we do not look at it here

application/vnd.kafka.{Embeded format}.{API version}+json

Here are the samples:

The Avro content type is application/vnd.kafka.avro.v2+json

The Protobuf content type is application/vnd.kafka.protobuf.v2+json

The JSON schema content type is application/vnd.kafka.jsonschema.v2+json

We will see how we use it in the next sections.

Message Schema

First, we define a schema for producer and consumer, we use the same for both here, but it can involve the schema any time without any impact of other ends. That is the purpose of using schema. Here is the schema of Protobuf format that we use in this post.

message value_salesorder_topic {
  required uint32 orderId = 1;
  optional string orderType = 2;
  required uint32 plant = 3;
  optional string soldToCustomer = 4;
  optional string targetSystem = 5;
}

Use the Confluent Control Center to set the schema for the topic

Confluent Control Center

Produce Message

We will send messages to this Kafka topic “salesorder_topic”.

The endpoint

POST /topic/salesorder_topic

Content-Type

We use Protobuf schema and version 2 of the API. And this is the Content-Type header.

application/vnd.kafka.protobuf.v2+json

Before making the HTTP request to send a message, we need the schema ID that we set to the topic from the previous step.

Make a request to the Schema Registry:

curl --location --request GET 'http://{SchemaRegistryHost}:{SchemaRegistryPort}/subjects/salesorder_topic-value/versions/1'

Response: we got the schema ID 41

{
    "subject": "salesorder_topic-value",
    "version": 1,
    "id": 41,
    "schemaType": "PROTOBUF",
    "schema": "\nmessage value_salesorder_topic {\n  required uint32 orderId = 1;\n  optional string orderType = 2;\n  required uint32 plant = 3;\n  optional string soldToCustomer = 4;\n  optional string targetSystem = 5;\n}\n"
}

Put altogether we have this request to send a message:

curl --location --request POST 'http://{RestProxyHost}:{RestProxyPort}/topics/salesorder_topic' \
--header 'Content-Type: application/vnd.kafka.protobuf.v2+json' \
--data-raw '{
  "key_schema": null,
  "value_schema_id": 41,
  "records": [
    {
      "key": null,
      "value": {
      	"orderId": 1,
      	"orderType": "SERVICE",
      	"plant": 5000,
      	"soldToCustomer": "2000",
      	"targetSystem": "SYS1"
      }
    }
  ]
}'

Noticed the host and port to send requests are different for the REST Proxy and the Schema Registry.

Consume Message

It is not a single endpoint request as the producer. It has to follow steps in order to consume messages from the Kafka topic.

1.       Create a consumer instance

2.       Subscribe to a topic or list of topics.

3.       Consume messages

Create a consumer instance

This is a stateful request, the consumer instance is removed after 5 minutes idle, and the output of “base_uri” is used for the next steps.

The endpoint

POST / consumers /{group name}

Request

curl --location --request POST 'http://{RestProxyHost}:{RestProxyPort}/consumers/consumergroup' \
--header 'Content-Type: application/vnd.kafka.v2+json' \
--data-raw '{
  "name": "my_consumer_proto",
  "format": "protobuf"
}'

Notice the format value of “protobuf” is required as we are using Protobuf schema. The value is “avro” and “jsonschema” for Avro and JSON schema accordingly.

Example Response

{
    "instance_id": "my_consumer_proto",
    "base_uri": "http://kafkaproxy.sample/consumers/consumergroup/instances/my_consumer_proto"
}

Subscribe to a topic

The endpoint

{value fo base_uri from the previous step}/subscription

Request

curl --location --request POST 'http://kafkaproxy.sample/consumers/consumergroup/instances/my_consumer_proto/subscription' \
--header 'Content-Type: application/vnd.kafka.protobuf.v2+json' \
--data-raw '{
  "topics": [
    "salesorder_topic"
  ]
}'

Notice at Content-Type and topic name, this is topic with Protobuf schema that we want to consume messages.

Consume messages

The endpoint

{value fo base_uri from the previous step}/records

Accept header

Accept: application/vnd.kafka.protobuf.v2+json

Request

curl --location --request GET 'http://kafkaproxy.sample/consumers/consumergroup/instances/my_consumer_proto/records?timeout=3000&max_bytes=300000' \
--header 'Accept: application/vnd.kafka.protobuf.v2+json'

Notice at Accept header, when we created a group instance from the previous step with the format “protobuf” then we have to specify which content type we will accept for the response message.

Example Response

[
    {
        "topic": "salesorder_topic",
        "key": null,
        "value": {
            "orderId": 6,
            "orderType": "SERVICE",
            "plant": 5000,
            "soldToCustomer": "2000",
            "targetSystem": "SYS1"
        },
        "partition": 0,
        "offset": 1
    }
]

Conclusion

The REST Proxy API is a wrapper from the Java library and it is useful to test sending and receiving messages from Kafka. Still, it is not easy to use as we see it takes a couple of steps to receive messages and the group instance is expired every 5 minutes.

The same steps we can use for other content types such as Avro, JSON schema, all we need is to replace content type appropriately.

References

Confluent REST Proxy API reference
https://docs.confluent.io/current/kafka-rest/api.html