Development guide

Development workflow

Set up kafkaconnect for local development.

  1. Clone the repo from GitHub:

$ git clone https://github.com/lsst-sqre/kafka-connect-manager.git
  1. Install your local copy into a virtualenv:

$ cd kafka-connect-manager
$ virtualenv -p Python3 venv
$ source venv/bin/activate
$ make update
  1. Create a branch for local development:

$ git checkout -b name-of-your-bugfix-or-feature
  1. Check that your changes pass the linter and tests:

$ tox -e lint typing py37
  1. Commit your changes and push your branch to GitHub:

$ git add .
$ git commit -m "Your detailed description of your changes."
$ git push origin name-of-your-bugfix-or-feature
  1. Submit a pull request through the GitHub website.

Running locally with docker-compose

docker-compose provides the additional services you need to run kafka-connect-manager locally.

Start the kafka broker, zookeeper and connect services:

cd kafka-connect-manager/tests
docker-compose up broker zookeeper and connect -d

Example: Creating an instance of the influxdb-sink connector

The following will create an instance of the influxdb-sink connector configured to write three kafka topics foo, bar and baz to the mydb InfluxDB database.

$ kafkaconnect create influxdb-sink --influxdb-url http://influxdb:8086 --database mydb foo bar baz
Discoverying Kafka topics...
Validation returned 0 error(s).
Uploading influxdb-sink connector configuration...

The create command provides sensible defaults for the connector configuration. Use --help to see the available options for each connector.

You can inspect the connector configuration with the config command:

$ kafkaconnect config influxdb-sink
{
  "connect.influx.db": "mydb",
  "connect.influx.error.policy": "THROW",
  "connect.influx.kcql": "INSERT INTO foo SELECT * FROM foo WITHTIMESTAMP sys_time();INSERT INTO bar SELECT * FROM bar WITHTIMESTAMP sys_time();INSERT INTO baz SELECT * FROM baz WITHTIMESTAMP sys_time()",
  "connect.influx.max.retries": "10",
  "connect.influx.password": "",
  "connect.influx.retry.interval": "60000",
  "connect.influx.timestamp": "sys_time()",
  "connect.influx.url": "http://influxdb:8086",
  "connect.influx.username": "-",
  "connect.progress.enabled": "false",
  "connector.class": "com.datamountaineer.streamreactor.connect.influx.InfluxSinkConnector",
  "name": "influxdb-sink",
  "tasks.max": "1",
  "topics": "foo,bar,baz"
}

You can check the status of the connector with the status command:

$ kafkaconnect status influxdb-sink
{
  "connector": {
      "state": "RUNNING",
      "worker_id": "connect:8083"
  },
  "name": "influxdb-sink",
  "tasks": [
      {
          "id": 0,
          "state": "RUNNING",
          "worker_id": "connect:8083"
      }
  ],
  "type": "sink"
}