Kafka Subscriber
Learn how to configure and use the Streams Kafka Subscriber.
6 minute read
Streams support different subscriber types. In order for a subscriber to receive events associated to a topic, it must subscribe using one of the following options:
Each topic created on the platform must be associated with at least one type of subscribers.
When creating your topic, you can set it via the subscribers config
parameter, in the topic’s configuration.
{
"name": "myTopic",
...
"subscribers": [
{
"type": "kafka|sse|webhook|websocket",
"config": {
...
}
}
]
}
...
}
If no subscriber is defined, the SSE subscriber is added by default.
Streams has strong requirements in terms of both quality of service and performance. To provide the best trade-off between these two conflicting aspects, Streams supports at-least-once delivery semantic.
We ensure this quality of service by keeping the last event ID delivered for each subscription to resume from it in case of failure (for example, network failure, component failure). The mechanism is internally managed by Streams for persistent subscribers, such as Webhook or Kafka subscribers, but depends on a client side mechanism for SSE. For more information, see Reconnect automatically after an interruption.
The subscription mode determines the format of the data and the type of events that will be sent to the subscribers. Each subscriber can choose between different modes that determine how the data will be transmitted (based on the selected subscription mode in the subscriber’s configuration).
Publisher Payload type | Subscription mode | Description |
---|---|---|
snapshot |
snapshot-only |
Streams sends the entire content (snapshot) to the subscriber each time a change is detected. We recommend you to use this mode for content, which doesn’t occur frequently and is fully updated. |
snapshot |
snapshot-patch |
Streams sends an initial event containing the entire content (snapshot). The subsequent events contain only the changed fields in the form of an array of JSON Patch operations. For more information, see Understanding snapshot-patch mode. |
event |
event |
Streams sends the published events, as-is, over time. |
The allowedSubscriptionModes
and defaultSubscriptionMode
attributes must be specified for each subscriber’s configuration:
{
"name": "myTopic",
...
"subscribers": [
{
"type": "sse",
"allowedSubscriptionModes": ["snapshot-only","snapshot-patch"],
"defaultSubscriptionMode": "snapshot-patch"
},
{
"type": "webhook",
"allowedSubscriptionModes": ["snapshot-only","snapshot-patch"],
"defaultSubscriptionMode": "snapshot-only"
}
]
...
}
event
subscription modeThe event
subscription mode is the only available mode when the topic is configured with a data source publishing payloads of type event
. Subscribers receive event as published by the publisher.
For example:
{
"Status":"Working - Contacted",
"LastModifiedDate":"2021-02-26T14:14:44.000Z",
"ChangeEventHeader":{
"entityName":"Lead",
"changeType":"UPDATE",
"changedFields":["Status","LastModifiedDate"],
"transactionKey":"0002a8c8-3c33-f5b9-9152-xxxxxx",
"commitTimestamp":1614348884000,
"recordIds":["00Q1t00000xxxxx"]
}
}
snapshot-patch
subscription modeIn the snapshot-patch
subscription mode, subscribers only receive incremental updates computed by Streams between the last two payloads published in the topic. For example, in the context of a brokerage app, if a user subscribes to 10 different symbols, each symbol contains different fields such as identifier, last, bid, ask. But only a few of them change at every market tick. When using snapshot-patch
mode, Streams automatically computes the differential data and sends the corresponding JSON Patch operations to the subscribers, avoiding resending the fields that have not changed.
Once an initial snapshot
event has been emitted, it will be followed by patch
events when Streams detects a change in the published content.
For example, given a first snapshot published in a topic:
{
"baz": "qux",
"foo": "bar"
}
The subscriber receives an initial event of type snapshot
containing the complete data set as follow:
{
"baz": "qux",
"foo": "bar"
}
If the publisher publishes a second snapshot:
{
"baz": "boo",
"hello": ["world"]
}
The subscriber now receives an event of type patch
containing the list of patch operations to apply on the initial snapshot:
[
{ "op": "replace", "path": "/baz", "value": "boo" },
{ "op": "add", "path": "/hello", "value": ["world"] },
{ "op": "remove", "path": "/foo" }
]
A patch
is a JSON document that provides the difference between two JSON documents. It is represented by an array of operations to apply to the previous version of the document.
A patch operation takes the following fields:
op
: defines the type of operation. For example, add, remove, replace.path
: defines where the operation applies in the document (JSON Pointer).value
: (optional) defines the value to apply: a raw JSON literal, object, or array.You can use a patch to modify an existing document. The use of JSON Patch format enables Streams to save bandwidth by pushing only the differences between two versions of a published content.
The following are supported JSON patch operations in Streams.
Adds a value to an object or inserts it into an array. In the case of an array, the value is inserted before the given index. The -
character can be used instead of an index to insert values at the end of an array.
{ "op": "add", "path": "/hello", "value": ["world"] },
Replaces a value. Equivalent to remove
followed by add
.
{ "op": "replace", "path": "/baz", "value": "boo" }
Removes a value from an object or array.
{ "op": "remove", "path": "/foo" }
snapshot-only
subscription modeIn this mode, the subscriber receives events of type snapshot
only when a change is detected by Streams.
If the publisher publishes twice the same payload:
{
"baz": "qux",
"foo": "bar"
}
the subscribers does not receive any event.
If the publisher publishes a payload containing changes compared to previously published payload:
{
"baz": "boo",
"hello": ["world"]
}
the subscriber receives an event of type snapshot
containing the full data set:
{
"baz": "boo",
"hello": ["world"]
}
You can restrict the list of subscription modes in which subscribers can subscribe to the topic by configuring the property allowedSubscriptionModes
with the list of allowed subscription modes you want to allow:
{
"name": "topic-1570803096398",
...
"subscribers": {
"sse": {
"allowedSubscriptionModes": [
"snapshot-only",
"snapshot-patch"
],
"defaultSubscriptionMode": "snapshot-patch"
}
...
},
...
}
You can define the default subscription mode with the defaultSubscriptionMode
attribute in the associated subscriber’s configuration. If you don’t define a default subscription mode, one is defined either by using the first item of allowedSubscriptionModes
or depending on the publisher payload type.
Publisher Payload type | defaultSubscriptionMode |
---|---|
snapshot | snapshot-patch |
event | event |
Learn how to configure and use the Streams Kafka Subscriber.
Learn how to configure and use the Streams server-sent events subscriber.
Learn how to configure and use the Streams Webhook Subscriber.
Learn how to configure and use the Streams WebSocket subscriber.
Describes the different type of errors that can occurs during a subscription to Streams.