Using kafkaconnect

Lenses InfluxDB Sink

In this section, we use kafkaconnect to create an instance of the Lenses InfluxDB Sink connector. We show the connector in action by producing messages to a kafka topic and query the messages recorded in InfluxDB.

Download the docker-compose file and start the services:

docker-compose up -d

Create the foo topic in kafka:

docker-compose exec broker kafka-topics --bootstrap-server broker:9092 --create --topic foo --partitions 1 --replication-factor 1

Create the mydb database in InfluxDB:

docker-compose exec influxdb influx -execute "CREATE DATABASE mydb"

Use kafka-connect-manager to create an instance of the InfluxDB Sink connector.

docker-compose run kafkaconnect create influxdb-sink -d mydb foo

You can check if the connector is running by using the status command:

docker-compose run kafkaconnect status influxdb-sink

Now use the kafka-avro-console-producer utility to produce Avro messages for the foo topic. The Avro schema for the message value is specified using the --property command line option. Note that because it runs inside the schema registry docker image, we need to use the internal broker port here:

docker-compose exec schema-registry kafka-avro-console-producer --bootstrap-server broker:29092 --topic foo --property value.schema='{"type":"record", "name":"foo", "fields":[{"name":"bar","type":"string"}, {"name":"baz","type":"float"}]}'
{"bar": "John Doe", "baz": 1}
{"bar": "John Doe", "baz": 2}
Ctrl+D

Finally, you can query the results in InfluxDB, you should get an output like this:

docker-compose exec influxdb influx -database mydb -execute "SELECT * FROM foo"
name: foo
time                bar      baz
----                ---      ---
1611597963632953639 John Doe 1
1611597964771771862 John Doe 1

You can inspect the connect service logs using:

docker-compose logs connect

Avro records for both key and value

For producing Avro records for both key and value use:

docker-compose exec schema-registry kafka-avro-console-producer --bootstrap-server broker:29092 --topic foo  --property parse.key=true --property key.schema='{"type":"record", "name":"id", "fields":[{"name":"id", "type":"int"}]}' --property value.schema='{"type":"record", "name":"foo", "fields":[{"name":"bar","type":"string",{"name":"baz","type":"float"}]}'
{"id":1}      {"bar": "John Doe","baz": 1}
Ctrl+D

Note that in this command we used <TAB> as the default separator for key and value, this can be changed with the --property key.separator="<separator>" option.

Recording arrays in InfluxDB

InfluxDB does not support array fields, the connector handles arrays in Avro by flattening them before writing to InfluxDB. The following command produce an Avro message with type array:

docker-compose exec schema-registry kafka-avro-console-producer --bootstrap-server broker:29092 --topic foo --property value.schema='{"type":"record", "name":"foo", "fields":[{"name":"bar","type":"string"}, {"name":"baz","type":{"type":"array","items":"float"}}]}'
{"bar": "John Doe","baz": [1,2,3]}
Ctrl+D

which is stored in InfluxDB like:

docker-compose exec influxdb influx -database mydb -execute "SELECT * FROM foo"
name: foo
time                bar      baz0 baz1 baz2
----                ---      ---- ---- ----
1611707507555316950 John Doe 1    2    3

Resetting consumer group offsets

When a sink connector is created, a consumer group keeps track of the offsets of each topic configured in the connector. From the InfluxDB Sink connector created above, the following command list the consumer groups.

docker-compose exec broker kafka-consumer-groups --bootstrap-server localhost:9092 --list
connect-influxdb-sink

The topic offset for the connect-influxdb-sink consumer group is shown using:

docker-compose exec broker kafka-consumer-groups --bootstrap-server localhost:9092  --describe --offsets  --group connect-influxdb-sink

GROUP                 TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                                             HOST            CLIENT-ID
connect-influxdb-sink foo             0          1               1               0               connector-consumer-influxdb-sink-0-896a850d-4cbc-406c-a0c6-afcc7fb31da5 /192.168.80.6   connector-consumer-influxdb-sink-0

The log-end-offset is the offset of the last message sent to Kafka, and the current-offset is the offset of the last message consumed by the connector. The difference is the consumer lag. In the example, the connector is configured with only one topic foo, and the only message produced was consumed by the connector.

It is possible to force the connector to consume the messages produced to the foo topic again by resetting the consumer group offsets.

The following commands will make the connector to write the messages to InfluxDB again, by resetting the consumer group offsets to the earliest offset available in Kafka.

First we check the consumer group state:

docker-compose exec broker kafka-consumer-groups --bootstrap-server localhost:9092 --group connect-influxdb-sink --describe --state

GROUP                     COORDINATOR (ID)          ASSIGNMENT-STRATEGY  STATE           #MEMBERS
connect-influxdb-sink     localhost:9092 (1)        range                Stable          1

To reset offsets wee need to change the consumer group state to Empty. To do that we delete the connector that is using the consumer group.

docker-compose run kafkaconnect delete influxdb-sink

Now we reset the consumer group offsets:

docker-compose exec broker kafka-consumer-groups --bootstrap-server localhost:9092 --group connect-influxdb-sink --topic foo --reset-offsets --to-earliest --execute

GROUP                          TOPIC                          PARTITION  NEW-OFFSET
connect-influxdb-sink          foo                            0          0

And finally recreate the connector:

docker-compose run kafkaconnect create influxdb-sink -d mydb foo

When deploying multiple InfluxDB Sink connectors consuming the same topics, a possible scenario is to configure one connector consuming the earliest offsets to recover historical data from Kafka into InfluxDB (“repairer” connector), and a second connector consuming the latest offsets to keep up with the current data in InfluxDB.