Kafka Subscriber
5 minute read
Streams Kafka subscriber allows clients to route events published in a Streams topic to an external Kafka topic.
Create a Kafka subscription
You can create a kafka subscription by making an HTTP post request on the following endpoint:
POST /streams/subscribers/kafka/api/v1/topics/{topicID}/subscriptions
The request body must contain a JSON kafka subscription configuration. For example:
{
"subscriptionMode": "snapshot-only",
"bootstrapServers": "my-kafka-cluster:9092",
"topic": "kafka-topic-to-publish-to",
"partition": 0,
"recordKey": "optional-custom-record-key",
"authorization": {
"type": "sasl_ssl",
"sasl": {
"mechanism": "plain",
"username": "myusername",
"password": "mypassword"
}
}
}
Configuration Entry | Mandatory | Default value | Description |
---|---|---|---|
subscriptionMode | no | Default subscription mode defined in the topic’s configuration | For more information, see subscription modes. |
bootstrapServers | yes | n/a | List of Kafka servers used to bootstrap connections to Kafka. |
topic | yes | n/a | Kafka topic in which a record must be sent. |
partition | no | n/a | Kafka partition to use. |
recordKey | no | topic id | Record key to use for each sent record. If not set, the topicId is used. |
authorization | no | n/a | Security configuration for connection to the external kafka broker. For more information, see section Security configuration with SASL and SSL. |
After the kafka subscription is successfully created, Streams start sending records to your kafka cluster.
Security configuration with SASL and SSL
The Kafka subscriber supports connection with external kafka broker configured with Secure sockets layer (SSL) and Simple authentication and security layer (SASL).
Note
SSL and SASL must be activated simultaneously on the kafka broker. The Kafka subscriber does not support SASL configured without SSL for transport layerThe following table describes the security configuration that you can setup when creating a kafka subscription:
Configuration Entry | Mandatory | Default value | Description |
---|---|---|---|
type | yes | n/a | Type of security protocol configured for the kafka subscription. Currently only sasl_ssl is supported. |
sasl.mechanism | yes | n/a | SASL mechanism used by the subscriber kafka to connect with the external kafka broker. Currently only plain mechanism is supported. |
sasl.username | yes | n/a | Username used by the Kafka subscriber to connect with the external kafka broker. |
sasl.password | yes | n/a | Password used by the Kafka subscriber to connect with the external kafka broker. |
Configure a custom SSL root certificate authority
With SSL enabled, Streams needs to trust the root certificate authority that signed your kafka broker certificates. If you are not using one of the generic SSL certificates providers (for example, Digicert, Let’s Encrypt, and so on), but a custom root certificate authority instead, you must add it to Streams:
Ensure that the root certificate authority you have is in PEM format. You can list and extract the root CA from a truststore with the following command:
keytool -list -v -keystore <KAFKA TRUSTORE IN JKS>
...
Your keystore contains 1 entry
Alias name: caroot
Creation date: Apr 22, 2022
Entry type: trustedCertEntry
...
keytool -exportcert -rfc -keystore <KAFKA TRUSTORE IN JKS> -alias <ALIAS NAME OF ROOT CA> -file kafka.truststore.pem
In this example, the alias name of the root CA is caroot
.
For more information on how to add the custom certificate, see Add self-signed TLS certificates, or ask your operator to perform this task if you are not operating Streams yourself.
Create status codes
The following are HTTP status codes that can be returned when trying to create a kafka subscription:
Code | Comment |
---|---|
201 Created | Indicates that the subscription request is valid and has been created. |
400 Bad Request | Indicates that the provided data is invalid. |
404 Not found | Indicates that the requested URL does not exist. |
Besides those specific errors, you can also receive any of the generic Streams API errors.
Stop a kafka subscription
To stop sending records to your kafka cluster, delete the corresponding kafka subscription with the following request:
DELETE /streams/subscribers/kafka/api/v1/subscriptions/{subscriptionId}
Delete status codes
The following are HTTP status codes that can be returned when deleting the kafka subscription:
Code | Comment |
---|---|
204 No Content | Indicates that the subscription has been successfully deleted. |
404 Not found | Indicates that the provided identifier does not correspond to an existing kafka subscription. |
Besides those specific errors, you can also receive any of the generic Streams API errors.
Getting a kafka subscription
To get an existing subscription, use the following GET request:
GET /streams/subscribers/kafka/api/v1/subscriptions/{subscriptionId}
Get status codes
List of HTTP status codes that can be returned when trying to get a kafka subscription:
Code | Comment |
---|---|
200 Ok | Indicates that the subscription requested is valid and has been retrieved. |
404 Not found | Indicates that the requested URL or subscription requested does not exist. |
Besides those specific errors, you can also receive any of the generic Streams API errors.
Testing a Kafka subscription
You can test a Kafka subscription by making an HTTP Post request on the following endpoint:
POST /streams/subscribers/kafka/api/v1/subscriptions/{subscriptionId}/test
The request body can contain any JSON object and it will be sent as-is to the identified subscription.
Test status codes
The following HTTP status codes can be returned while testing a Kafka subscription:
Code | Comment |
---|---|
202 Accepted | Indicates that the payload has been successfully sent to the subscription. |
400 Bad Request | Indicates that the provided data are invalid. |
404 Not found | Indicates that the requested URL does not exist. |
Besides those specific errors, you can also receive any of the generic Streams API errors.
Get kafka subscriptions for a topic
Use the following HTTP Get request on your topic to get existing subscriptions:
GET /streams/subscribers/kafka/api/v1/topics/{topicId}/subscriptions
For more information on how pagination and sorting work, see Pagination.
The field names allowed for sorting are:
- subscriptionMode
- kafkaBootstrapServers
- kafkaTopic
- kafkaPartition
- kafkaRecordKey
Kafka record
As soon as the publisher starts to publish data, the kafka subscribers start to receive the messages, and they send a record with custom headers and a payload.
Record headers
The following table describes the custom headers, which are added to the messages.
Header name | Description |
---|---|
X-Axway-Streams-Subscription-Id | Unique identifier of the kafka subscription. |
X-Axway-Streams-Topic-Id | Identifier of the topic to which the subscription belongs. |
X-Axway-Streams-Event-Id | Identifier of the event. |
X-Axway-Streams-Event-Type | Type of the payload (snapshot, patch or error). |
Record payload
The payload of the record contains your actual data, for example, events and snapshots, and subscription errors.